In [None]:
# %cd ..
# %cd test/CellModeller-ingallslab
# %pip install -e .

In [1]:
import torch.nn.functional as F
import os
import torch
import numpy as np
import pandas as pd
import gzip
from torch.nn.utils.rnn import pad_sequence
from sklearn.preprocessing import MinMaxScaler, StandardScaler



def make_numpy_array(pickle_to_dict):
    
    columns_to_exclude = [0, 3, 4, 5, 7, 8, 10, 12, 17]
    #[Exclude time, label, cellType, divideFlag, growthRate, LifeHistory, targetVol, radius, strainRate]
    #Include [id, parent, cellAge, startVol, pos, length, dir, ends0, ends1, strainRate_rolling]

   
    property_dict = {'time': [], 'id': [], 'parent': [], 'label': [],
                     'cellType': [], 'divideFlag': [], 'cellAge': [], 'growthRate': [], 'LifeHistory': [],
                     'startVol': [], 'targetVol': [], 'pos': [], 'radius': [], 'length': [], 'dir': [],
                     'ends0': [], 'ends1': [], 'strainRate': [], 'strainRate_rolling': []}

 
    for key in pickle_to_dict['cellStates'].keys():
        cell_state = pickle_to_dict['cellStates'][key]

    
        property_dict['time'].append(cell_state.time)
        property_dict['id'].append(cell_state.id)
        property_dict['label'].append(cell_state.label)
        property_dict['cellType'].append(cell_state.cellType)
        property_dict['divideFlag'].append(cell_state.divideFlag)
        property_dict['cellAge'].append(cell_state.cellAge)
        property_dict['growthRate'].append(cell_state.growthRate)

       
        if hasattr(cell_state, 'LifeHistory'):
            property_dict['LifeHistory'].append(cell_state.LifeHistory)
        else:
            property_dict['LifeHistory'].append(0)

        property_dict['startVol'].append(cell_state.startVol)
        property_dict['targetVol'].append(cell_state.targetVol)
        property_dict['pos'].append(
            np.sqrt(np.sum(np.power(cell_state.pos, 2))))
        property_dict['radius'].append(cell_state.radius)
        property_dict['length'].append(cell_state.length)
        property_dict['dir'].append(np.arctan2(
            cell_state.dir[1], cell_state.dir[0]))
        property_dict['ends0'].append(
            np.sqrt(np.sum(np.power(cell_state.ends[0], 2))))
        property_dict['ends1'].append(
            np.sqrt(np.sum(np.power(cell_state.ends[1], 2))))
        property_dict['strainRate'].append(cell_state.strainRate)
        property_dict['strainRate_rolling'].append(
            cell_state.strainRate_rolling)

   
    for bac_id in property_dict['id']:
        if bac_id in pickle_to_dict['lineage']:
            property_dict['parent'].append(pickle_to_dict['lineage'][bac_id])
        else:
            property_dict['parent'].append(0)

    df_bacteria = pd.DataFrame.from_dict(property_dict)


    df_bacteria.fillna(0, inplace=True)

  
    df_bacteria = df_bacteria.astype(float)
    tensor=torch.tensor(df_bacteria.values)
    mask = torch.ones(tensor.shape[1], dtype=torch.bool)
    mask[columns_to_exclude] = False

    tensor_filtered = tensor[:, mask]

    return tensor_filtered



### Code to extract a single simulation without any normalization

In [2]:
def pickle_explorer(subfolder, tensor_list,step_size):
    
   # print(f"Processing subfolder: {subfolder}")
    pickle_files = sorted([f.path for f in os.scandir(
        subfolder) if f.is_file() and f.name.endswith('.pickle')])
    


    
    for pickle_file in pickle_files:
        #print(pickle_file)

        pickle_to_dict = np.load(os.path.join(
            subfolder, pickle_file), allow_pickle=True)
        df = make_numpy_array(pickle_to_dict)
        #print(df.shape)
        tensor_list.append(df)
        step_size.append(df.shape[0])
    
    return tensor_list,step_size

   


In [3]:
folder_to_explore = 'PATH_TO_A_SIMULATION_FOLDER' 
sims, step_sizes = pickle_explorer(folder_to_explore,[],[])    

## Below code is to save the data in csv format for visualization
# import csv
# csv_file_name = './outputs.csv'


# with open(csv_file_name, 'w', newline='') as csvfile:
#     csv_writer = csv.writer(csvfile)

#     #id, parent, cellAge, startVol, pos, length, dir, ends0, ends1, strainRate_rolling
#     csv_writer.writerow(['Timestep', 'Cell ID', 'Parent ID', 'Cell Age', 'Start Volume', 'Position', 'Length', 'Direction', 'Ends0', 'Ends1', 'Strain Rate Rolling'])

#     for i in range(len(sims)):
#         sim = sims[i]

#         for j in range(sim.shape[0]):

#             csv_writer.writerow([i, int(sim[j][0]), int(sim[j][1]), f"{sim[j][2]:.3f}", f"{sim[j][3]:.3f}", f"{sim[j][4]:.3f}", f"{sim[j][5]:.3f}", f"{sim[j][6]:.3f}", f"{sim[j][7]:.3f}", f"{sim[j][8]:.3f}", f"{sim[j][9]:.3f}"])


def scrollable_table(df):
    import plotly.graph_objs as go
    
    fig = go.Figure(data=[go.Table(
        header=dict(values=list(df.columns),
                    align='left'),
        cells=dict(values=[df[i] for i in df.columns],
                   align='left',
                   fill_color='white'))  # Set the background color of the table cells to dark grey
    ])

    # Set the background color of the entire figure to dark grey
    fig.update_layout(
        paper_bgcolor='black',
        plot_bgcolor='black'
    )

    return fig
scrollable_table(pd.read_csv('/home/stormageddon/MITACS/GNN/simulation_data.csv'))


In [4]:
import torch
import torch_geometric
import networkx as nx
import numpy as np
from torch_geometric.data import Data

def create_edge_index(cell_ids, parent_ids, positions, threshold_distance=1.0):
    
    cell_ids = torch.tensor(cell_ids, dtype=torch.long)
    parent_ids = torch.tensor(parent_ids, dtype=torch.long)
    positions = torch.tensor(positions, dtype=torch.float)

  
    id_to_index = {cell_id.item(): i for i, cell_id in enumerate(cell_ids)}

   
    edge_index = torch.stack([
        torch.tensor([id_to_index.get(parent_id.item(), -1) for parent_id in parent_ids]),
        torch.tensor([id_to_index.get(cell_id.item(), -1) for cell_id in cell_ids])
    ], dim=0)

    # Filter out edges where either the parent or child ID is not found
    valid_edges = (edge_index[0] != -1) & (edge_index[1] != -1)
    edge_index = edge_index[:, valid_edges]

    G = nx.Graph()
    for i, pos in enumerate(positions):
        G.add_node(i, pos=pos.numpy(), parent_info=parent_ids[i].item())

    # Add edges based on proximity (position threshold)
    for i, pos in enumerate(positions):
        for j, other_pos in enumerate(positions):
            if i != j and np.linalg.norm(pos.numpy() - other_pos.numpy()) < threshold_distance:
                G.add_edge(i, j)

    edges_from_nx = torch.tensor(list(G.edges)).t().contiguous()

    return edge_index, edges_from_nx


graph_list = []

for timestep_data in sims:
   
    node_features = timestep_data[:, 2:]  
    edge_index, edges_from_nx = create_edge_index(
        timestep_data[:, 0], timestep_data[:, 1], timestep_data[:, 4:6], threshold_distance=1.0
    )

    graph = Data(x=node_features, edge_index=edge_index, pos=timestep_data[:, 4:6], parent_info=timestep_data[:, 1])

    graph.edges_from_nx = edges_from_nx

    graph_list.append(graph)




To copy construct from a tensor, it is recommended to use sourceTensor.clone().detach() or sourceTensor.clone().detach().requires_grad_(True), rather than torch.tensor(sourceTensor).


To copy construct from a tensor, it is recommended to use sourceTensor.clone().detach() or sourceTensor.clone().detach().requires_grad_(True), rather than torch.tensor(sourceTensor).


To copy construct from a tensor, it is recommended to use sourceTensor.clone().detach() or sourceTensor.clone().detach().requires_grad_(True), rather than torch.tensor(sourceTensor).



In [68]:
## For normalization

from torch_geometric.transforms import NormalizeFeatures

In [5]:
import torch
import torch.nn.functional as F
from torch_geometric.data import Data, batch


time_series_data = sims

distance_threshold = 1.0

graph_data_list = []

def compute_distance(cell1, cell2):
    position1 = cell1[:, 4] 
    position2 = cell2[:, 4]
   
    distance = torch.norm(position1 - position2, dim=0)
    return distance


for timestep, cells in enumerate(time_series_data):
   
    cell_ids = cells[:, 0] 
    

    node_ids_list = cell_ids.tolist()
    node_features_list = cells[:, 1:].tolist()  # Exclude 'Cell ID'

    # Connect cells based on distance
    edges_list = []
    for i in range(len(cells)):
        for j in range(i + 1, len(cells)):
            distance = compute_distance(cells[i:i+1, :], cells[j:j+1, :])
            #print(distance)
            if distance < distance_threshold:
                edges_list.append((cell_ids[i].item(), cell_ids[j].item()))

    
    edges = torch.tensor(edges_list, dtype=torch.long).t().contiguous()

    graph_data = Data(x=torch.tensor(node_features_list, dtype=torch.float),
                     edge_index=edges)

    graph_data_list.append(graph_data)



an integer is required (got type float).  Implicit conversion to integers using __int__ is deprecated, and may be removed in a future version of Python.

