In [3]:
import pandas as pd
import warnings
import os
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
import plotly.graph_objs as go
import plotly.io as pio

# Ignore FutureWarning
warnings.filterwarnings("ignore", category=FutureWarning)

# Define global variables and paths
specific_path = ['L3', 'Mi9', ['T4a', 'T4b', 'T4c', 'T4d'], 'Mi1', 'L1']
classification_file_path = './data/classification.txt'
connections_file_path = './data/connections.txt'
swc_base_path = './data/sk_lod1_783_healed'
output_dir = './results/L3_to_T4_to_L1/'

# Create output directory if it does not exist
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

# Read the data
classification = pd.read_csv(classification_file_path)
connections = pd.read_csv(connections_file_path)

# Create a dictionary mapping cell types to their respective root IDs
cell_types = ['Mi1', 'Mi9', 'L1', 'L3', 'T4a', 'T4b', 'T4c', 'T4d', 'Tm3']
root_ids_by_type = {cell_type: classification[(classification['cell_type'] == cell_type) & (classification['side'] == 'right')]['root_id'].tolist() for cell_type in cell_types}

# Get the list of L3 neuron IDs
l3_neuron_ids = root_ids_by_type.get('L3', [])

# DFS to find paths
def find_paths_dfs(neuron_id, connections, root_ids_by_type):
    stack = [(neuron_id, [neuron_id], 0)]  # Stack stores (neuron_id, path, current path index)
    paths = set()

    while stack:
        current_neuron, path, current_path_index = stack.pop()

        # Check if the path has reached the target (L1)
        if current_path_index >= len(specific_path) - 1:
            if current_neuron in root_ids_by_type.get('L1', []):
                paths.add(tuple(path))  # Add the valid path
            continue

        # Get the next cell type(s) in the path
        next_cell_type = specific_path[current_path_index + 1]
        next_cell_type_set = {root_id for cell_type in (next_cell_type if isinstance(next_cell_type, list) else [next_cell_type]) for root_id in root_ids_by_type.get(cell_type, [])}

        # Find connections for the next step in the path
        possible_connections = connections[(connections['pre_root_id'] == current_neuron) & (connections['post_root_id'].isin(next_cell_type_set))] if current_path_index < 2 else connections[(connections['post_root_id'] == current_neuron) & (connections['pre_root_id'].isin(next_cell_type_set))]

        # Traverse the connections and continue building the path
        for _, row in possible_connections.iterrows():
            next_neuron = row['post_root_id'] if current_path_index < 2 else row['pre_root_id']
            if next_neuron != current_neuron:  # Avoid cycles
                new_path = path + [next_neuron]
                stack.append((next_neuron, new_path, current_path_index + 1))

    return [list(path) for path in paths]  # Return all unique paths

# Find all paths starting from the given L3 neurons
def find_all_paths(start_neuron_ids, connections, root_ids_by_type):
    all_paths = {}
    for neuron_id in start_neuron_ids:
        if neuron_id in connections['pre_root_id'].values:  # Check if the neuron has outgoing connections
            paths = find_paths_dfs(neuron_id, connections, root_ids_by_type)
            if paths:
                all_paths[neuron_id] = paths  # Store the paths for this neuron
    return all_paths

# Find all paths for L3 neurons and save the result
all_paths = find_all_paths(l3_neuron_ids, connections, root_ids_by_type)
all_paths_path = os.path.join(output_dir, 'l1_all_paths.pkl')

# Save the paths to a pickle file
pd.to_pickle(all_paths, all_paths_path)


In [None]:
import pandas as pd
import warnings
import os
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
import plotly.graph_objs as go
import plotly.io as pio

# Ignore FutureWarning
warnings.filterwarnings("ignore", category=FutureWarning)

# Define global variables and paths
specific_path = ['L3', 'Tm9', ['T5a', 'T5b', 'T5c', 'T5d'], 'Tm1', 'L2']
classification_file_path = './data/classification.txt'
connections_file_path = './data/connections.txt'
swc_base_path = './data/sk_lod1_783_healed'
output_dir = './results/L3_to_T5_to_L2/'

# Create output directory if it does not exist
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

# Read the data
classification = pd.read_csv(classification_file_path)
connections = pd.read_csv(connections_file_path)

# Create a dictionary mapping cell types to their respective root IDs
cell_types = ['Tm1', 'Tm9', 'L2', 'L3', 'T5a', 'T5b', 'T5c', 'T5d', 'Tm3']
root_ids_by_type = {cell_type: classification[(classification['cell_type'] == cell_type) & (classification['side'] == 'right')]['root_id'].tolist() for cell_type in cell_types}

# Get the list of L3 neuron IDs
l3_neuron_ids = root_ids_by_type.get('L3', [])

# DFS to find paths
def find_paths_dfs(neuron_id, connections, root_ids_by_type):
    stack = [(neuron_id, [neuron_id], 0)]  # Stack stores (neuron_id, path, current path index)
    paths = set()

    while stack:
        current_neuron, path, current_path_index = stack.pop()

        # Check if the path has reached the target (L2)
        if current_path_index >= len(specific_path) - 1:
            if current_neuron in root_ids_by_type.get('L2', []):
                paths.add(tuple(path))  # Add the valid path
            continue

        # Get the next cell type(s) in the path
        next_cell_type = specific_path[current_path_index + 1]
        next_cell_type_set = {root_id for cell_type in (next_cell_type if isinstance(next_cell_type, list) else [next_cell_type]) for root_id in root_ids_by_type.get(cell_type, [])}

        # Find connections for the next step in the path
        possible_connections = connections[(connections['pre_root_id'] == current_neuron) & (connections['post_root_id'].isin(next_cell_type_set))] if current_path_index < 2 else connections[(connections['post_root_id'] == current_neuron) & (connections['pre_root_id'].isin(next_cell_type_set))]

        # Traverse the connections and continue building the path
        for _, row in possible_connections.iterrows():
            next_neuron = row['post_root_id'] if current_path_index < 2 else row['pre_root_id']
            if next_neuron != current_neuron:  # Avoid cycles
                new_path = path + [next_neuron]
                stack.append((next_neuron, new_path, current_path_index + 1))

    return [list(path) for path in paths]  # Return all unique paths

# Find all paths starting from the given L3 neurons
def find_all_paths(start_neuron_ids, connections, root_ids_by_type):
    all_paths = {}
    for neuron_id in start_neuron_ids:
        if neuron_id in connections['pre_root_id'].values:  # Check if the neuron has outgoing connections
            paths = find_paths_dfs(neuron_id, connections, root_ids_by_type)
            if paths:
                all_paths[neuron_id] = paths  # Store the paths for this neuron
    return all_paths

# Find all paths for L3 neurons and save the result
all_paths = find_all_paths(l3_neuron_ids, connections, root_ids_by_type)
all_paths_path = os.path.join(output_dir, 'L2_all_paths.pkl')

# Save the paths to a pickle file
pd.to_pickle(all_paths, all_paths_path)


In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm
import warnings
import matplotlib.gridspec as gridspec

# Ignore FutureWarning
warnings.filterwarnings("ignore", category=FutureWarning)

# Set dataset paths and weight paths
classification_file_path = './data/classification.txt'
swc_base_path = './data/sk_lod1_783_healed'
output_dir = './result/combined_distance_distribution/'

# Create the output directory if it does not exist
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

# Read the classification data
classification = pd.read_csv(classification_file_path)

# Define cell types and subtypes
cell_types = ['L3', 'L2', 'L1']
t4_subtypes = ['T4a', 'T4b', 'T4c', 'T4d']
t5_subtypes = ['T5a', 'T5b', 'T5c', 'T5d']

# Create a dictionary to store root IDs by cell type
root_ids_by_type = {cell_type: classification[(classification['cell_type'] == cell_type) & (classification['side'] == 'right')]['root_id'].tolist() for cell_type in cell_types + t4_subtypes + t5_subtypes}

# Read column assignment data for Cartesian coordinates
column_assignment_file_path = './data/column_assignment.txt'
df = pd.read_csv(column_assignment_file_path)

# Function to get Cartesian coordinates (x, y) for a given root_id
def get_cartesian_by_root_id(root_id):
    result = df[df['root_id'] == root_id]
    if not result.empty:
        p, q = result[['x', 'y']].values[0]
        x = 2 * p + 1 if q % 2 == 1 else 2 * p
        y = q / 2
        return np.array([x, y])
    else:
        return None

# Function to create hexagonal markers
def create_hexagon_marker(size=1, orientation=0):
    angles = np.linspace(0, 2 * np.pi, 6, endpoint=False) + np.radians(orientation)
    return np.column_stack([np.cos(angles), np.sin(angles)]) * size

# Function to calculate directions and distances for each path
def calculate_all_directions(paths, root_ids_by_type, subtypes):
    directions_by_subtype = {subtype: [] for subtype in subtypes}
    for neuron_id, neuron_paths in tqdm(paths.items(), desc="Calculating directions"):
        l3_soma = get_cartesian_by_root_id(neuron_id)
        if l3_soma is not None:
            for path in neuron_paths:
                if len(path) > 2:
                    t_type_id = path[2]
                    for subtype in subtypes:
                        if t_type_id in root_ids_by_type[subtype]:
                            target_soma = get_cartesian_by_root_id(path[-1])
                            if target_soma is not None:
                                direction = target_soma - l3_soma
                                distance = np.linalg.norm(direction)
                                T_soma = get_cartesian_by_root_id(t_type_id)
                                if T_soma is not None and distance > 0:
                                    directions_by_subtype[subtype].append((T_soma, direction, distance))
    return directions_by_subtype

# Function to calculate average distances and update the coordinates dictionary
def calculate_average_distances(all_xy_coords, distance_sums, distance_counts):
    average_distances = {key: distance_sums[key] / distance_counts[key] for key in distance_sums.keys()}
    for key, avg_distance in average_distances.items():
        all_xy_coords[key] = avg_distance

# Function to plot direction maps, adjusting color saturation inversely with distance
def plot_directions(directions_by_subtype, ax, title):
    all_xy_coords = {tuple(get_cartesian_by_root_id(root_id)): -1 for root_id in df['root_id'].dropna() if get_cartesian_by_root_id(root_id) is not None}
    norm = plt.Normalize(-np.pi, np.pi)  # Normalize angles
    cmap = plt.cm.hsv  # Use HSV color map
    direction_sums, distance_sums, distance_counts = {}, {}, {}

    # Sum the directions and distances for each point
    for subtype, directions in directions_by_subtype.items():
        for T_soma, direction, distance in directions:
            key = tuple(T_soma)
            if key not in direction_sums:
                direction_sums[key] = np.array(direction)
                distance_sums[key] = distance
                distance_counts[key] = 1
            else:
                direction_sums[key] += np.array(direction)
                distance_sums[key] += distance
                distance_counts[key] += 1

    # Calculate the average distances
    calculate_average_distances(all_xy_coords, distance_sums, distance_counts)

    # Find the maximum and minimum distances for normalization
    max_distance = max(v for v in all_xy_coords.values() if v >= 0)
    min_distance = min(v for v in all_xy_coords.values() if v >= 0)

    # Create hexagon marker
    hexagon_marker = create_hexagon_marker(size=1, orientation=0)

    # Plot the points, coloring them based on the average distance and direction
    for (x, y), distance in all_xy_coords.items():
        if distance < 0:
            ax.scatter(x, y, color='gray', s=45, marker=hexagon_marker, edgecolor='gray')
        else:
            normalized_distance = (distance - min_distance) / (max_distance - min_distance)
            alpha = 1

            summed_direction = direction_sums[(x, y)]
            angle = np.arctan2(summed_direction[1], summed_direction[0])
            color = cmap(norm(angle))
            ax.scatter(x, y, color=color, alpha=alpha, s=45, marker=hexagon_marker, edgecolor=color)

    # Set plot title and remove axis labels
    ax.set_title(title, fontsize=30, fontname='Times New Roman')
    ax.set_aspect('equal')
    ax.grid(False)
    ax.set_xticks([])
    ax.set_yticks([])
    for spine in ax.spines.values():
        spine.set_visible(False)

# Function to plot a color wheel with a square boundary
def plot_color_wheel(ax):
    theta = np.linspace(-np.pi, np.pi, 500)
    r = np.linspace(0, 1, 500)
    T, R = np.meshgrid(theta, r)
    norm = plt.Normalize(-np.pi, np.pi)  # Normalize angles
    cmap = plt.cm.hsv  # Use HSV color map
    angle_colors = cmap(norm(T))

    # Adjust color saturation
    for i in range(angle_colors.shape[0]):
        angle_colors[i, :, :-1] = angle_colors[i, :, :-1] * R[i, :, None] + (1 - R[i, :, None])

    ax.pcolormesh(T, R, angle_colors, shading='auto')
    ax.set_aspect('equal')
    ax.grid(False)
    ax.set_xticks([])
    ax.set_yticks([])
    for spine in ax.spines.values():
        spine.set_visible(False)

# Read the T4 and T5 path data
t4_paths_path = './result/L3_to_T4_to_L1/l1_all_paths.pkl'
t5_paths_path = './result/L3_to_T5_to_L2/l2_all_paths.pkl'
t4_paths = pd.read_pickle(t4_paths_path)
t5_paths = pd.read_pickle(t5_paths_path)

# Calculate directions and distances for T4 and T5 paths
l3_to_l1_dirs_and_dists_t4 = calculate_all_directions(t4_paths, root_ids_by_type, t4_subtypes)
l3_to_l2_dirs_and_dists_t5 = calculate_all_directions(t5_paths, root_ids_by_type, t5_subtypes)

# Create a 4x9 layout, with the last column for the color wheel
fig = plt.figure(figsize=(20, 8))
gs = gridspec.GridSpec(4, 9, figure=fig, wspace=0.1, hspace=0.1)

# Plot direction maps for T4 subtypes
for i, subtype in enumerate(t4_subtypes):
    ax = fig.add_subplot(gs[0:2, (2 * i):(2 * i + 2)])
    plot_directions({subtype: l3_to_l1_dirs_and_dists_t4[subtype]}, ax, f"{subtype}")

# Plot direction maps for T5 subtypes
for i, subtype in enumerate(t5_subtypes):
    ax = fig.add_subplot(gs[2:4, (2 * i):(2 * i + 2)])
    plot_directions({subtype: l3_to_l2_dirs_and_dists_t5[subtype]}, ax, f"{subtype}")

# Plot the color wheel with a square boundary
ax_color_wheel = fig.add_subplot(gs[0:4, 8], projection='polar', frame_on=False)
plot_color_wheel(ax_color_wheel)

# Adjust layout and display the plot
plt.subplots_adjust(left=0.05, right=0.95, top=0.95, bottom=0.05)
plt.show()