# Discrete Simulation

In [1]:
import numpy as np
import pandas as pd
from scipy.stats import dirichlet
from tqdm import tqdm
import anndata

# --- 1. Simulation Configuration ---
class Config:
    N_PROGENITOR = 1000
    N_FATE_A = 500
    N_FATE_B = 500
    N_CELLS = N_PROGENITOR + N_FATE_A + N_FATE_B

    N_HOUSEKEEPING = 500
    N_DE = 250
    N_IS = 250
    N_GENES = N_HOUSEKEEPING + N_DE + N_IS

    TIME = 20.0 # Time t for the kinetic equations

# --- 2. Kinetic Model Functions ---
def solve_u(alpha, beta, t):
    """Calculates unspliced counts at time t."""
    return (alpha / beta) * (1 - np.exp(-beta * t))

def solve_s(alpha, beta, gamma, t):
    """Calculates spliced counts at time t."""
    exp_beta_t = np.exp(-beta * t)
    exp_gamma_t = np.exp(-gamma * t)
    s = (alpha / gamma) * (1 - exp_gamma_t) - \
        (alpha / (gamma - beta)) * (exp_beta_t - exp_gamma_t)
    return s

# --- 3. Main Simulation Function (Corrected Logic) ---
def simulate_data_final(cfg):
    """
    This definitive version correctly generates gene-level counts, distributes only
    spliced counts to isoforms via multinomial sampling, and stores the
    original integer isoform counts.
    """
    print("ðŸ§¬ Setting up simulation...")

    cell_indices = {
        "Progenitor": np.arange(cfg.N_PROGENITOR),
        "FateA": np.arange(cfg.N_PROGENITOR, cfg.N_PROGENITOR + cfg.N_FATE_A),
        "FateB": np.arange(cfg.N_PROGENITOR + cfg.N_FATE_A, cfg.N_CELLS)
    }
    gene_indices = {
        "housekeeping": np.arange(cfg.N_HOUSEKEEPING),
        "de": np.arange(cfg.N_HOUSEKEEPING, cfg.N_HOUSEKEEPING + cfg.N_DE),
        "is": np.arange(cfg.N_HOUSEKEEPING + cfg.N_DE, cfg.N_GENES)
    }

    # Pre-allocate final gene-level matrices
    U_final = np.zeros((cfg.N_GENES, cfg.N_CELLS), dtype=int)
    S_final = np.zeros((cfg.N_GENES, cfg.N_CELLS), dtype=int)
    
    # This list will store dictionaries containing all isoform-level data
    isoform_data_list = []

    print(f"ðŸ”¬ Simulating {cfg.N_GENES} genes for {cfg.N_CELLS} cells...")

    for g in tqdm(range(cfg.N_GENES)):
        # Step A & B: Define gene type and kinetic parameters
        if g in gene_indices["housekeeping"]: 
            gene_type = "housekeeping"
        elif g in gene_indices["de"]: 
            gene_type = "de"
        else: 
            gene_type = "is"
        beta_g_cells = np.zeros(cfg.N_CELLS)
        gamma_g_cells = np.zeros(cfg.N_CELLS)
        alpha_g_cells = np.zeros(cfg.N_CELLS)
        if gene_type == "housekeeping":
            alpha_val = np.random.uniform(4, 6)
            alpha_g_cells[:] = alpha_val
            beta_val = np.random.uniform(1.8, 2.2)
            beta_g_cells[:] = beta_val
            gamma_val = np.random.uniform(0.9, 1.1)
            gamma_g_cells[:] = gamma_val
        elif gene_type == "de":
            alpha_1 = np.random.uniform(4, 6)
            alpha_2 = np.random.uniform(12, 18)
            alpha_3 = np.random.uniform(4, 6)
            alpha_g_cells[cell_indices["Progenitor"]] = alpha_1
            alpha_g_cells[cell_indices["FateB"]] = alpha_1
            alpha_g_cells[cell_indices["FateA"]] = alpha_2
            beta_1 = np.random.uniform(1.8, 2.2)
            beta_2 = np.random.uniform(3.6, 4.4)
            beta_3 = np.random.uniform(1.8, 2.2)
            beta_g_cells[cell_indices["Progenitor"]] = beta_1
            beta_g_cells[cell_indices["FateB"]] = beta_1
            beta_g_cells[cell_indices["FateA"]] = beta_2
            gamma_val = np.random.uniform(0.9, 1.1)
            gamma_g_cells[:] = gamma_val
        elif gene_type == "is":
            alpha_val = np.random.uniform(4, 6)
            alpha_g_cells[:] = alpha_val
            beta_val = np.random.uniform(1.8, 2.2)
            beta_g_cells[:] = beta_val
            gamma_val = np.random.uniform(0.9, 1.1)
            gamma_g_cells[:] = gamma_val
            
        # Step C: Generate ground truth isoform proportions
        k = np.random.randint(2, 6)
        pi_g = np.zeros((k, cfg.N_CELLS))
        if gene_type == "housekeeping" or gene_type == "de":
            j = np.random.randint(k)
            conc = np.ones(k)
            conc[j] = 20.0
            proportions = dirichlet(conc).rvs(1).flatten()
            pi_g = np.tile(proportions, (cfg.N_CELLS, 1)).T
        elif gene_type == "is":
            j1 = np.random.randint(k)
            conc1 = np.ones(k); conc1[j1] = 20.0
            props1 = dirichlet(conc1).rvs(1).flatten()
            pi_g[:, cell_indices["Progenitor"]] = np.tile(props1, (cfg.N_PROGENITOR, 1)).T
            pi_g[:, cell_indices["FateA"]] = np.tile(props1, (cfg.N_FATE_A, 1)).T
            j2_options = np.delete(np.arange(k), j1)
            j2 = np.random.choice(j2_options)
            conc2 = np.ones(k); conc2[j2] = 20.0
            props2 = dirichlet(conc2).rvs(1).flatten()
            pi_g[:, cell_indices["FateB"]] = np.tile(props2, (cfg.N_FATE_B, 1)).T

        # Step D: Calculate & Sample GENE-LEVEL counts
        u_gene_expected = solve_u(alpha_g_cells, beta_g_cells, cfg.TIME)
        s_gene_expected = solve_s(alpha_g_cells, beta_g_cells, gamma_g_cells, cfg.TIME)
        u_gene_expected[u_gene_expected < 0] = 0
        s_gene_expected[s_gene_expected < 0] = 0
        u_gene_counts = np.random.poisson(u_gene_expected)
        s_gene_counts = np.random.poisson(s_gene_expected)

        # Step E: Distribute SPLICED counts into ISOFORM counts
        spliced_isoform_counts = np.zeros((k, cfg.N_CELLS), dtype=int)
        for i in range(cfg.N_CELLS):
            if s_gene_counts[i] > 0:
                spliced_isoform_counts[:, i] = np.random.multinomial(s_gene_counts[i], pi_g[:, i])

        # Step F: Calculate OBSERVED proportions
        observed_proportions = np.divide(spliced_isoform_counts, s_gene_counts, out=np.zeros_like(spliced_isoform_counts, dtype=float), where=s_gene_counts!=0)

        # Step G: Store all data
        U_final[g, :] = u_gene_counts
        S_final[g, :] = s_gene_counts
        isoform_data_list.append({
            'gene_idx': g,
            'n_isoforms': k,
            'spliced_isoform_counts': spliced_isoform_counts,
            'proportions_ground_truth': pi_g,
            'proportions_observed': observed_proportions
        })
        
    print("Simulation complete!")
    P_data = pd.DataFrame(isoform_data_list)
    return U_final, S_final, P_data

In [26]:
# --- Run and Verify ---
if __name__ == '__main__':
    config = Config()
    np.random.seed(42)
    U_counts, S_counts, P_data = simulate_data_final(config)
    
    # --- Create AnnData object ---
    print("Assembling the AnnData object for saving...")

    # --- Prepare standard annotations ---
    # --- Prepare standard annotations ---
    obs_df = pd.DataFrame({'cell_state': (['Progenitor'] * config.N_PROGENITOR +
                                          ['FateA'] * config.N_FATE_A +
                                          ['FateB'] * config.N_FATE_B)})

    var_df = pd.DataFrame({'gene_category': (['Housekeeping'] * config.N_HOUSEKEEPING +
                                             ['DE'] * config.N_DE +
                                             ['IS'] * config.N_IS)})
    var_df.index = var_df.index.astype(str)

    # --- Correctly structure the isoform data ---
    p_data_sorted = P_data.sort_values('gene_idx').reset_index(drop=True)
    var_df['n_isoforms'] = p_data_sorted['n_isoforms'].values
    
    # --- Create the AnnData object ---
    adata = anndata.AnnData(
        X=(U_counts + S_counts).T,
        obs=obs_df,
        var=var_df
    )
    adata.layers['unspliced'] = U_counts.T
    adata.layers['spliced'] = S_counts.T
    
    # **** CORRECTED SECTION ****
    # Convert lists of ragged arrays into dictionaries for safe saving
    # The keys will be gene indices as strings to be safe
    adata.uns['spliced_isoform_counts'] = {
        str(idx): arr for idx, arr in enumerate(p_data_sorted['spliced_isoform_counts'])
    }
    adata.uns['proportions_ground_truth'] = {
        str(idx): arr for idx, arr in enumerate(p_data_sorted['proportions_ground_truth'])
    }
    adata.uns['proportions_observed'] = {
        str(idx): arr for idx, arr in enumerate(p_data_sorted['proportions_observed'])
    }
    
    print("AnnData object assembled correctly!")
    print("--- Unstructured Data Keys (.uns) ---")
    print(list(adata.uns.keys()))

ðŸ§¬ Setting up simulation...
ðŸ”¬ Simulating 1000 genes for 2000 cells...


100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 1000/1000 [00:02<00:00, 398.41it/s]

Simulation complete!
Assembling the AnnData object for saving...
AnnData object assembled correctly!
--- Unstructured Data Keys (.uns) ---
['spliced_isoform_counts', 'proportions_ground_truth', 'proportions_observed']





In [27]:
filename = "simulated_data.h5ad"

# Save the object to the file
adata.write_h5ad(filename)
print("Object saved successfully.")
adata_loaded = anndata.read_h5ad(filename)
print(adata_loaded)
print("Object loaded successfully.")

Object saved successfully.
AnnData object with n_obs Ã— n_vars = 2000 Ã— 1000
    obs: 'cell_state'
    var: 'gene_category', 'n_isoforms'
    uns: 'proportions_ground_truth', 'proportions_observed', 'spliced_isoform_counts'
    layers: 'spliced', 'unspliced'
Object loaded successfully.


In [24]:
adata.uns['proportions_observed']['980'][0][0:50] - adata.uns['proportions_observed']['980'][0][1550:1600]

array([1.        , 1.        , 0.90909091, 1.        , 1.        ,
       0.90909091, 1.        , 0.74603175, 1.        , 1.        ,
       0.875     , 0.8       , 0.9       , 1.        , 0.83333333,
       1.        , 1.        , 1.        , 1.        , 1.        ,
       1.        , 0.85714286, 1.        , 1.        , 1.        ,
       0.91666667, 1.        , 1.        , 0.85714286, 0.85714286,
       1.        , 1.        , 0.5       , 0.75      , 1.        ,
       1.        , 1.        , 1.        , 1.        , 1.        ,
       0.8       , 1.        , 1.        , 1.        , 1.        ,
       0.875     , 1.        , 0.9       , 1.        , 0.83333333])

# Continuous Simulation

In [9]:
import numpy as np
import pandas as pd
from scipy.stats import dirichlet
from tqdm import tqdm
import anndata

# Set a random seed for reproducibility
np.random.seed(42)

# --- 1. Simulation Configuration ---
class Config:
    N_PROGENITOR = 1000
    N_FATE_A = 500
    N_FATE_B = 500
    N_CELLS = N_PROGENITOR + N_FATE_A + N_FATE_B

    N_HOUSEKEEPING = 500
    N_DE = 250
    N_IS = 250
    N_GENES = N_HOUSEKEEPING + N_DE + N_IS

    # Time represents the duration of the process, not a cell's state
    TIME = 20.0

# --- 2. Kinetic Model & Helper Functions ---
def solve_u(alpha, beta, t):
    """Calculates unspliced counts at time t."""
    return (alpha / beta) * (1 - np.exp(-beta * t))

def solve_s(alpha, beta, gamma, t):
    """Calculates spliced counts at time t."""
    exp_beta_t = np.exp(-beta * t)
    exp_gamma_t = np.exp(-gamma * t)
    s = (alpha / gamma) * (1 - exp_gamma_t) - \
        (alpha / (gamma - beta)) * (exp_beta_t - exp_gamma_t)
    return s

def sigmoid(x, x0, k):
    """A simple sigmoid function."""
    return 1 / (1 + np.exp(-k * (x - x0)))

# --- 3. Main Simulation Function for Continuous Trajectory ---
def simulate_data_trajectory(cfg):
    """
    Simulates data along a continuous bifurcating trajectory.
    This version has improved transition logic for IS genes to ensure continuity.
    """
    print("ðŸ§¬ Setting up continuous trajectory simulation...")

    # --- Step A: Generate Pseudotime and Cell Fates ---
    cell_indices = {
        "Progenitor": np.arange(cfg.N_PROGENITOR),
        "FateA": np.arange(cfg.N_PROGENITOR, cfg.N_PROGENITOR + cfg.N_FATE_A),
        "FateB": np.arange(cfg.N_PROGENITOR + cfg.N_FATE_A, cfg.N_CELLS)
    }
    gene_indices = {
        "housekeeping": np.arange(cfg.N_HOUSEKEEPING),
        "de": np.arange(cfg.N_HOUSEKEEPING, cfg.N_HOUSEKEEPING + cfg.N_DE),
        "is": np.arange(cfg.N_HOUSEKEEPING + cfg.N_DE, cfg.N_GENES)
    }
    
    t_pseudo = np.zeros(cfg.N_CELLS)
    t_pseudo[cell_indices["Progenitor"]] = np.random.uniform(2, 4, cfg.N_PROGENITOR)
    t_pseudo[cell_indices["FateA"]] = np.random.uniform(4, 6, cfg.N_FATE_A)
    t_pseudo[cell_indices["FateB"]] = np.random.uniform(4, 6, cfg.N_FATE_B)


    U_final = np.zeros((cfg.N_GENES, cfg.N_CELLS), dtype=float)
    S_final = np.zeros((cfg.N_GENES, cfg.N_CELLS), dtype=float)
    isoform_data_list = []

    print(f"Simulating {cfg.N_GENES} genes along the trajectory...")

    for g in tqdm(range(cfg.N_GENES)):
        # --- Step B: Generate parameters as a function of pseudotime ---
        if g in gene_indices["housekeeping"]: 
            gene_type = "housekeeping"
        elif g in gene_indices["de"]: 
            gene_type = "de"
        else: 
            gene_type = "is"

        beta_g_cells = np.zeros(cfg.N_CELLS)
        gamma_g_cells = np.zeros(cfg.N_CELLS)
        alpha_g_cells = np.zeros(cfg.N_CELLS)
        
        if gene_type == "housekeeping":
            alpha_val = np.random.uniform(4, 6)
            alpha_g_cells[:] = alpha_val
            beta_val = np.random.uniform(1.8, 2.2)
            beta_g_cells[:] = beta_val
            gamma_val = np.random.uniform(0.9, 1.1)
            gamma_g_cells[:] = gamma_val
            # conc = np.ones(k_iso)
            # conc[j] = 20.0
            # proportions = dirichlet(conc).rvs(size = cfg.N_CELLS)
            # pi_g = proportions.T
        elif gene_type == "de":
            t_fate_a = t_pseudo[cell_indices["FateA"]]
            s = sigmoid(t_fate_a, x0=5, k=4)
            # print(s)
            alpha_1 = np.random.uniform(4, 6)
            alpha_2 = np.random.uniform(12, 18)
            alpha_g_cells[cell_indices["Progenitor"]] = alpha_1
            alpha_g_cells[cell_indices["FateB"]] = alpha_1
            alpha_g_cells[cell_indices["FateA"]] = alpha_1 + (alpha_2 - alpha_1) * s
            beta_1 = np.random.uniform(1.8, 2.2)
            beta_2 = np.random.uniform(3.6, 4.4)
            beta_g_cells[cell_indices["Progenitor"]] = beta_1
            beta_g_cells[cell_indices["FateB"]] = beta_1
            beta_g_cells[cell_indices["FateA"]] = beta_1 + (beta_2 - beta_1) * s
            gamma_val = np.random.uniform(0.9, 1.1)
            gamma_g_cells[:] = gamma_val
            # conc = np.ones(k_iso)
            # conc[j] = 20.0
            # proportions = dirichlet(conc).rvs(size = cfg.N_CELLS)
            # pi_g = proportions.T
        elif gene_type == "is":
            alpha_val = np.random.uniform(4, 6)
            alpha_g_cells[:] = alpha_val
            beta_val = np.random.uniform(1.8, 2.2)
            beta_g_cells[:] = beta_val
            gamma_val = np.random.uniform(0.9, 1.1)
            gamma_g_cells[:] = gamma_val
            
            # conc1 = np.ones(k_iso) 
            # conc1[j] = 20.0
            # # props1 = dirichlet(conc1).rvs(size = cfg.N_PROGENITOR).flatten()
            # pi_g[:, cell_indices["Progenitor"]] = dirichlet(conc1).rvs(size = cfg.N_PROGENITOR).T
            # pi_g[:, cell_indices["FateA"]] = dirichlet(conc1).rvs(size = cfg.N_FATE_A).T
            # j2_options = np.delete(np.arange(k_iso), j)
            # j2 = np.random.choice(j2_options)
            # conc2 = np.ones(k_iso)
            # conc2[j2] = 20.0
            # s = s.reshape(-1,1)
            # conc_change = (1 - s) * conc1 + s * conc2
            # pi_g[:, cell_indices["FateB"]] =  np.array([np.random.dirichlet(conc_row) for conc_row in conc_change]).T


        k_iso = np.random.randint(2, 6)
        pi_g = np.zeros((k_iso, cfg.N_CELLS))
        j1 = np.random.randint(k_iso)
        conc1 = np.ones(k_iso)
        conc1[j1] = 20.0
        if gene_type == "housekeeping" or gene_type == "de":
            proportions = dirichlet(conc1).rvs(size = cfg.N_CELLS)
            pi_g = proportions.T
        elif gene_type == "is":
            props1 = dirichlet(conc1).rvs(size = cfg.N_PROGENITOR)
            props2 = dirichlet(conc1).rvs(size = cfg.N_FATE_A)
            pi_g[:, cell_indices["Progenitor"]] = props1.T
            pi_g[:, cell_indices["FateA"]] = props2.T
            j2_options = np.delete(np.arange(k_iso), j1)
            j2 = np.random.choice(j2_options)
            conc2 = np.ones(k_iso)
            conc2[j2] = 20.0
            t_fate_b = t_pseudo[cell_indices["FateB"]]
            s = sigmoid(t_fate_b, x0=5, k=4) # s goes from 0 to 1
            s = s.reshape(-1,1)
            props_begin = dirichlet(conc1).rvs(size = cfg.N_FATE_B)
            props_end = dirichlet(conc2).rvs(size = cfg.N_FATE_B)
            props3 = (1 - s) * props_begin + s * props_end
            pi_g[:, cell_indices["FateB"]] = props3.T

        # --- Step C: Calculate counts based on time-dependent parameters ---
        u_gene_expected = solve_u(alpha_g_cells, beta_g_cells, t_pseudo)
        s_gene_expected = solve_s(alpha_g_cells, beta_g_cells, gamma_g_cells, t_pseudo)
        u_gene_expected[u_gene_expected < 0] = 0
        s_gene_expected[s_gene_expected < 0] = 0
        u_gene_counts = np.random.poisson(u_gene_expected)
        s_gene_counts = np.random.poisson(s_gene_expected)

        spliced_isoform_counts = np.zeros((k_iso, cfg.N_CELLS), dtype=int)
        for i in range(cfg.N_CELLS):
            if s_gene_counts[i] > 0:
                # print(spliced_isoform_counts[:, i])
                # print(s_gene_counts[i])
                # print(pi_g)
                spliced_isoform_counts[:, i] = np.random.multinomial(s_gene_counts[i], pi_g[:, i])

        observed_proportions = np.divide(spliced_isoform_counts, s_gene_counts, out=np.zeros_like(spliced_isoform_counts, dtype=float), where=s_gene_counts!=0)

        # --- Step D: Store all data ---
        U_final[g, :] = u_gene_counts
        S_final[g, :] = s_gene_counts
        isoform_data_list.append({
            'gene_idx': g, 'n_isoforms': k_iso, 'spliced_isoform_counts': spliced_isoform_counts,
            'proportions_ground_truth': pi_g, 'proportions_observed': observed_proportions
        })
        
    print("Simulation complete!")
    P_data = pd.DataFrame(isoform_data_list)
    
    return U_final, S_final, P_data, t_pseudo, cell_indices

## 4. Run Simulation and Create AnnData Object
if __name__ == '__main__':
    config = Config()
    U_counts, S_counts, P_data, pseudotime, fates = simulate_data_trajectory(config)
    fates = [key for key, value in fates.items() for _ in range(len(value))]
    
    print("Assembling the AnnData object...")

    # Prepare cell metadata (.obs) including the new trajectory info
    obs_df = pd.DataFrame({
        'cell_state': fates,
        'pseudotime': pseudotime
    })

    # Prepare gene metadata (.var)
    var_df = pd.DataFrame({'gene_category': (['Housekeeping'] * config.N_HOUSEKEEPING +
                                             ['DE'] * config.N_DE +
                                             ['IS'] * config.N_IS)})
    var_df.index = var_df.index.astype(str)
    
    p_data_sorted = P_data.sort_values('gene_idx').reset_index(drop=True)
    var_df['n_isoforms'] = p_data_sorted['n_isoforms'].values
    
    # Create the AnnData object
    adata = anndata.AnnData(
        X=(U_counts + S_counts).T,
        obs=obs_df,
        var=var_df
    )
    adata.layers['unspliced'] = U_counts.T
    adata.layers['spliced'] = S_counts.T
    
    # Store isoform data as dictionaries for safe saving
    adata.uns['spliced_isoform_counts'] = {str(idx): arr for idx, arr in enumerate(p_data_sorted['spliced_isoform_counts'])}
    adata.uns['proportions_ground_truth'] = {str(idx): arr for idx, arr in enumerate(p_data_sorted['proportions_ground_truth'])}
    adata.uns['proportions_observed'] = {str(idx): arr for idx, arr in enumerate(p_data_sorted['proportions_observed'])}
    
    print("\nðŸŽ‰ AnnData object assembled successfully! ðŸŽ‰")
    print("\n--- AnnData Summary ---")
    print(adata)
    print("\n--- Cell Annotations (.obs) ---")
    print(adata.obs.head())



ðŸ§¬ Setting up continuous trajectory simulation...
Simulating 1000 genes along the trajectory...


100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 1000/1000 [00:02<00:00, 389.33it/s]

Simulation complete!
Assembling the AnnData object...

ðŸŽ‰ AnnData object assembled successfully! ðŸŽ‰

--- AnnData Summary ---
AnnData object with n_obs Ã— n_vars = 2000 Ã— 1000
    obs: 'cell_state', 'pseudotime'
    var: 'gene_category', 'n_isoforms'
    uns: 'spliced_isoform_counts', 'proportions_ground_truth', 'proportions_observed'
    layers: 'unspliced', 'spliced'

--- Cell Annotations (.obs) ---
   cell_state  pseudotime
0  Progenitor    2.749080
1  Progenitor    3.901429
2  Progenitor    3.463988
3  Progenitor    3.197317
4  Progenitor    2.312037





In [14]:
filename = "simulated_data_continuous.h5ad"

# Save the object to the file
adata.write_h5ad(filename)
print("Object saved successfully.")
adata = anndata.read_h5ad(filename)
print(adata)
print("Object loaded successfully.")
adata.uns['spliced_isoform_counts']

Object saved successfully.
AnnData object with n_obs Ã— n_vars = 2000 Ã— 1000
    obs: 'cell_state', 'pseudotime'
    var: 'gene_category', 'n_isoforms'
    uns: 'proportions_ground_truth', 'proportions_observed', 'spliced_isoform_counts'
    layers: 'spliced', 'unspliced'
Object loaded successfully.


{'0': array([[1, 0, 0, ..., 0, 0, 0],
        [7, 4, 6, ..., 1, 5, 3],
        [0, 0, 0, ..., 0, 0, 0],
        [0, 0, 0, ..., 2, 0, 1],
        [0, 0, 0, ..., 0, 0, 1]], shape=(5, 2000)),
 '1': array([[0, 1, 0, ..., 0, 0, 0],
        [0, 0, 0, ..., 1, 0, 0],
        [3, 5, 4, ..., 6, 3, 3],
        [0, 0, 0, ..., 0, 0, 0],
        [0, 0, 0, ..., 0, 0, 0]], shape=(5, 2000)),
 '10': array([[4, 3, 7, ..., 4, 7, 4],
        [0, 2, 0, ..., 0, 0, 1]], shape=(2, 2000)),
 '100': array([[2, 3, 3, ..., 4, 3, 6],
        [0, 0, 0, ..., 0, 0, 0],
        [0, 0, 0, ..., 1, 0, 1]], shape=(3, 2000)),
 '101': array([[0, 0, 0, ..., 0, 0, 0],
        [1, 0, 0, ..., 0, 0, 0],
        [0, 0, 0, ..., 0, 0, 0],
        [0, 0, 0, ..., 0, 0, 0],
        [5, 4, 4, ..., 5, 4, 5]], shape=(5, 2000)),
 '102': array([[ 5,  7,  6, ..., 10,  5,  4],
        [ 2,  0,  0, ...,  0,  0,  0]], shape=(2, 2000)),
 '103': array([[0, 0, 0, ..., 0, 0, 0],
        [0, 0, 0, ..., 1, 0, 0],
        [0, 0, 0, ..., 0, 0, 0],
     

In [45]:
adata.layers['spliced'][:,0:3]

array([[3., 4., 2.],
       [5., 5., 5.],
       [2., 5., 5.],
       ...,
       [3., 2., 3.],
       [3., 9., 2.],
       [6., 4., 5.]])

In [6]:
import numpy as np
import pandas as pd
from scipy.stats import dirichlet
from tqdm import tqdm
import anndata

# Set a random seed for reproducibility
np.random.seed(42)

## 1. Simulation Configuration
class Config:
    N_PROGENITOR = 1000
    N_FATE_A = 500
    N_FATE_B = 500
    N_CELLS = N_PROGENITOR + N_FATE_A + N_FATE_B

    N_HOUSEKEEPING = 500
    N_DE = 250
    N_IS = 250
    N_GENES = N_HOUSEKEEPING + N_DE + N_IS

    TIME = 20.0 # Duration of the process

## 2. Kinetic Model & Helper Functions
def solve_u(alpha, beta, t):
    return (alpha / beta) * (1 - np.exp(-beta * t))

def solve_s(alpha, beta, gamma, t):
    exp_beta_t = np.exp(-beta * t)
    exp_gamma_t = np.exp(-gamma * t)
    s = (alpha / gamma) * (1 - exp_gamma_t) - \
        (alpha / (gamma - beta)) * (exp_beta_t - exp_gamma_t)
    return s

def sigmoid(x, x0, k):
    return 1 / (1 + np.exp(-k * (x - x0)))

## 3. Modular Functions for Parameter Setting
def set_kinetic_parameters(gene_type: str, cfg: Config, t_pseudo: np.ndarray, cell_fate: np.ndarray):
    alpha_g_cells = np.zeros(cfg.N_CELLS)
    beta_g = 0.0 
    gamma_g = 0.0
    
    if gene_type == "housekeeping":
        alpha_g_cells[:] = np.random.uniform(4, 6)
        beta_g = np.random.uniform(1.8, 2.2)
        gamma_g = np.random.uniform(0.9, 1.1)
    elif gene_type == "de":
        alpha_low = np.random.uniform(4, 6)
        alpha_high = np.random.uniform(12, 18)
        alpha_g_cells[:] = alpha_low
        t_fate_a = t_pseudo[cell_fate == "FateA"]
        s = sigmoid(t_fate_a, x0=4.0, k=4) # Centered in the middle of Fate A pseudotime
        alpha_g_cells[cell_fate == "FateA"] = alpha_low + (alpha_high - alpha_low) * s
        beta_g = np.random.uniform(1.8, 2.2) 
        gamma_g = np.random.uniform(0.9, 1.1)
    elif gene_type == "is":
        alpha_g_cells[:] = np.random.uniform(4, 6)
        beta_g = np.random.uniform(1.8, 2.2)
        gamma_g = np.random.uniform(0.9, 1.1)
    return alpha_g_cells, beta_g, gamma_g

def set_isoform_proportions(gene_type: str, k_iso: int, cfg: Config, t_pseudo: np.ndarray, cell_fate: np.ndarray):
    pi_g = np.zeros((k_iso, cfg.N_CELLS))
    j_progenitor = np.random.randint(k_iso)
    conc_progenitor = np.ones(k_iso)
    conc_progenitor[j_progenitor] = 20.0
    
    # Generate noisy proportions for non-transitioning cells
    non_fate_b_mask = (cell_fate == "Progenitor") | (cell_fate == "FateA")
    n_non_fate_b = np.sum(non_fate_b_mask)
    pi_g[:, non_fate_b_mask] = dirichlet(conc_progenitor).rvs(size=n_non_fate_b).T
    
    if gene_type == "is":
        j_fate_b_options = np.delete(np.arange(k_iso), j_progenitor)
        j_fate_b = np.random.choice(j_fate_b_options)
        conc_fate_b = np.ones(k_iso)
        conc_fate_b[j_fate_b] = 20.0
        
        # For each Fate B cell, interpolate its distribution and then sample
        fate_b_mask = cell_fate == "FateB"
        t_fate_b = t_pseudo[fate_b_mask]
        s_values = sigmoid(t_fate_b, x0=4.0, k=4) # Centered in middle of Fate B pseudotime
        
        fate_b_indices = np.where(fate_b_mask)[0]
        for i, cell_idx in enumerate(fate_b_indices):
            s = s_values[i]
            conc_interpolated = (1 - s) * conc_progenitor + s * conc_fate_b
            pi_g[:, cell_idx] = dirichlet(conc_interpolated).rvs(1).flatten()
            
    return pi_g

## 4. Main Simulation Function
def simulate_data_final(cfg):
    print("ðŸ§¬ Setting up continuous trajectory simulation...")
    t_pseudo = np.zeros(cfg.N_CELLS)
    cell_fate = np.full(cfg.N_CELLS, "Progenitor", dtype=object)
    t_pseudo[:cfg.N_PROGENITOR] = np.random.uniform(0, 3, cfg.N_PROGENITOR)
    fate_a_idx = slice(cfg.N_PROGENITOR, cfg.N_PROGENITOR + cfg.N_FATE_A)
    t_pseudo[fate_a_idx] = np.random.uniform(3, 5, cfg.N_FATE_A)
    cell_fate[fate_a_idx] = "FateA"
    fate_b_idx = slice(cfg.N_PROGENITOR + cfg.N_FATE_A, cfg.N_CELLS)
    t_pseudo[fate_b_idx] = np.random.uniform(3, 5, cfg.N_FATE_B)
    cell_fate[fate_b_idx] = "FateB"

    gene_indices = {"housekeeping": np.arange(cfg.N_HOUSEKEEPING), "de": np.arange(cfg.N_HOUSEKEEPING, cfg.N_HOUSEKEEPING + cfg.N_DE), "is": np.arange(cfg.N_HOUSEKEEPING + cfg.N_DE, cfg.N_GENES)}
    U_final = np.zeros((cfg.N_GENES, cfg.N_CELLS), dtype=int)
    S_final = np.zeros((cfg.N_GENES, cfg.N_CELLS), dtype=int)
    isoform_data_list = []

    print(f"ðŸ”¬ Simulating {cfg.N_GENES} genes along the trajectory...")
    for g in tqdm(range(cfg.N_GENES)):
        if g in gene_indices["housekeeping"]: gene_type = "housekeeping"
        elif g in gene_indices["de"]: gene_type = "de"
        else: gene_type = "is"
        
        alpha_g_cells, beta_g, gamma_g = set_kinetic_parameters(gene_type, cfg, t_pseudo, cell_fate)
        k_iso = np.random.randint(2, 6)
        pi_g = set_isoform_proportions(gene_type, k_iso, cfg, t_pseudo, cell_fate)
        
        u_gene_expected = solve_u(alpha_g_cells, beta_g, cfg.TIME)
        s_gene_expected = solve_s(alpha_g_cells, beta_g, gamma_g, cfg.TIME)
        u_gene_expected[u_gene_expected < 0] = 0
        s_gene_expected[s_gene_expected < 0] = 0
        u_gene_counts = np.random.poisson(u_gene_expected)
        s_gene_counts = np.random.poisson(s_gene_expected)
        spliced_isoform_counts = np.zeros((k_iso, cfg.N_CELLS), dtype=int)
        
        for i in range(cfg.N_CELLS):
            if s_gene_counts[i] > 0 and pi_g[:, i].sum() > 0: # Ensure proportions sum to 1
                # Normalize proportions just in case of floating point inaccuracies
                pvals = pi_g[:, i] / pi_g[:, i].sum()
                spliced_isoform_counts[:, i] = np.random.multinomial(s_gene_counts[i], pvals)
                
        observed_proportions = np.divide(spliced_isoform_counts, s_gene_counts, out=np.zeros_like(spliced_isoform_counts, dtype=float), where=s_gene_counts!=0)
        U_final[g, :] = u_gene_counts
        S_final[g, :] = s_gene_counts
        isoform_data_list.append({'gene_idx': g, 'n_isoforms': k_iso, 'spliced_isoform_counts': spliced_isoform_counts, 'proportions_ground_truth': pi_g, 'proportions_observed': observed_proportions})
    
    print("âœ… Simulation complete!")
    P_data = pd.DataFrame(isoform_data_list)
    return U_final, S_final, P_data, t_pseudo, cell_fate

## 5. Run Simulation and Create AnnData Object
if __name__ == '__main__':
    config = Config()
    U_counts, S_counts, P_data, pseudotime, fates = simulate_data_final(config)
    
    # The rest of the script for creating and saving adata is the same...

ðŸ§¬ Setting up continuous trajectory simulation...
ðŸ”¬ Simulating 1000 genes along the trajectory...


100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 1000/1000 [00:10<00:00, 92.25it/s]

âœ… Simulation complete!





In [7]:
obs_df = pd.DataFrame({
    'cell_state': fates,
    'pseudotime': pseudotime
})
# Prepare gene metadata (.var)
var_df = pd.DataFrame({'gene_category': (['Housekeeping'] * config.N_HOUSEKEEPING +
                                         ['DE'] * config.N_DE +
                                         ['IS'] * config.N_IS)})
var_df.index = var_df.index.astype(str)
    
p_data_sorted = P_data.sort_values('gene_idx').reset_index(drop=True)
var_df['n_isoforms'] = p_data_sorted['n_isoforms'].values

# Create the AnnData object
adata = anndata.AnnData(
    X=(U_counts + S_counts).T,
    obs=obs_df,
    var=var_df
)
adata.layers['unspliced'] = U_counts.T
adata.layers['spliced'] = S_counts.T
   
# Store isoform data as dictionaries for safe saving
adata.uns['spliced_isoform_counts'] = {str(idx): arr for idx, arr in enumerate(p_data_sorted['spliced_isoform_counts'])}
adata.uns['proportions_ground_truth'] = {str(idx): arr for idx, arr in enumerate(p_data_sorted['proportions_ground_truth'])}
adata.uns['proportions_observed'] = {str(idx): arr for idx, arr in enumerate(p_data_sorted['proportions_observed'])}
   
print("\nðŸŽ‰ AnnData object assembled successfully! ðŸŽ‰")
print("\n--- AnnData Summary ---")
print(adata)
print("\n--- Cell Annotations (.obs) ---")
print(adata.obs.head())


ðŸŽ‰ AnnData object assembled successfully! ðŸŽ‰

--- AnnData Summary ---
AnnData object with n_obs Ã— n_vars = 2000 Ã— 1000
    obs: 'cell_state', 'pseudotime'
    var: 'gene_category', 'n_isoforms'
    uns: 'spliced_isoform_counts', 'proportions_ground_truth', 'proportions_observed'
    layers: 'unspliced', 'spliced'

--- Cell Annotations (.obs) ---
   cell_state  pseudotime
0  Progenitor    1.123620
1  Progenitor    2.852143
2  Progenitor    2.195982
3  Progenitor    1.795975
4  Progenitor    0.468056




In [8]:
filename = "simulated_data_continuous.h5ad"

# Save the object to the file
adata.write_h5ad(filename)
print("Object saved successfully.")
adata_loaded = anndata.read_h5ad(filename)
print(adata_loaded)
print("Object loaded successfully.")

Object saved successfully.
AnnData object with n_obs Ã— n_vars = 2000 Ã— 1000
    obs: 'cell_state', 'pseudotime'
    var: 'gene_category', 'n_isoforms'
    uns: 'proportions_ground_truth', 'proportions_observed', 'spliced_isoform_counts'
    layers: 'spliced', 'unspliced'
Object loaded successfully.


In [101]:
adata_loaded.uns['proportions_observed']

{'0': array([[0.        , 0.        , 0.        , 0.        , 0.        ,
         0.1       , 0.        , 0.        , 0.2       , 0.        ,
         0.        , 0.        , 0.        , 0.        , 0.125     ,
         0.        , 0.        , 0.        , 0.        , 0.        ],
        [0.        , 0.        , 0.33333333, 0.        , 0.        ,
         0.2       , 0.5       , 0.        , 0.2       , 0.        ,
         0.        , 0.        , 0.        , 0.        , 0.        ,
         0.        , 0.        , 0.        , 0.        , 0.        ],
        [1.        , 1.        , 0.66666667, 0.875     , 1.        ,
         0.7       , 0.5       , 1.        , 0.4       , 0.83333333,
         1.        , 1.        , 1.        , 1.        , 0.75      ,
         0.        , 0.        , 0.        , 0.        , 0.        ],
        [0.        , 0.        , 0.        , 0.125     , 0.        ,
         0.        , 0.        , 0.        , 0.        , 0.16666667,
         0.        , 0.   

In [99]:
adata_loaded.uns['spliced_isoform_counts']

{'0': array([[ 0,  0,  0,  0,  0,  1,  0,  0,  1,  0,  0,  0,  0,  0,  1,  0,
          0,  0,  0,  0],
        [ 0,  0,  2,  0,  0,  2,  2,  0,  1,  0,  0,  0,  0,  0,  0,  0,
          0,  0,  0,  0],
        [11,  3,  4,  7,  7,  7,  2,  5,  2,  5,  1,  3,  4,  6,  6,  0,
          0,  0,  0,  0],
        [ 0,  0,  0,  1,  0,  0,  0,  0,  0,  1,  0,  0,  0,  0,  1,  0,
          0,  0,  0,  0],
        [ 0,  0,  0,  0,  0,  0,  0,  0,  1,  0,  0,  0,  0,  0,  0,  0,
          0,  0,  0,  0]]),
 '1': array([[6, 4, 4, 1, 1, 7, 3, 2, 1, 5, 6, 3, 8, 1, 3, 0, 0, 0, 0, 0],
        [0, 1, 0, 0, 0, 1, 0, 1, 0, 0, 2, 0, 0, 0, 1, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 1, 0, 0, 0, 0, 0, 0, 0, 0]]),
 '2': array([[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0],
        [6, 4, 4, 4, 5, 7, 3, 1, 4, 6, 5, 7, 3, 9, 4, 0, 0, 0, 0, 0]]),
 '3': array([[5, 2, 4, 7, 4, 2, 7, 7, 3, 3, 2, 4, 4, 5, 6, 0, 0,

In [110]:
np.random.multinomial(5, dirichlet([20.,1.,1.,1.]).rvs(1))

ValueError: object too deep for desired array

In [108]:
print(dirichlet([ 10.5,1.,1.,10.5]).rvs(1))

[[0.56159886 0.01822559 0.00870045 0.4114751 ]]
