Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
sronilsson committed Mar 25, 2024
2 parents 4e248cf + 29f73f4 commit 80e5f48
Show file tree
Hide file tree
Showing 11 changed files with 815 additions and 277 deletions.
257 changes: 194 additions & 63 deletions simba/data_processors/spontaneous_alternation_calculator.py

Large diffs are not rendered by default.

128 changes: 94 additions & 34 deletions simba/mixins/feature_extraction_supplement_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,10 +437,9 @@ def velocity_aggregator(
)

@staticmethod
def spontaneous_alternations(data: pd.DataFrame,
arm_names: List[str],
center_name: str) -> Tuple[Dict[Union[str, Tuple[int]], int]]:

def spontaneous_alternations(
data: pd.DataFrame, arm_names: List[str], center_name: str
) -> Tuple[Dict[Union[str, Tuple[int]], int]]:
"""
Detects spontaneous alternations between a set of user-defined ROIs.
Expand Down Expand Up @@ -473,14 +472,27 @@ def spontaneous_alternations(data: pd.DataFrame,
>>> spontanous_alternations = FeatureExtractionSupplemental.spontaneous_alternations(data=df, shape_names=['left', 'top', 'right', 'bottom'])
"""

def get_sliding_alternation(data: np.ndarray) -> Tuple[Union[Dict[int, List[int]], Dict[int, List[int]], Dict[Tuple[int], List[int]]]]:
stride, same_arm_return_cnt, alternate_arm_return_cnt = len(arm_names) - 1, 0, 0
def get_sliding_alternation(
data: np.ndarray,
) -> Tuple[
Union[
Dict[int, List[int]], Dict[int, List[int]], Dict[Tuple[int], List[int]]
]
]:
stride, same_arm_return_cnt, alternate_arm_return_cnt = (
len(arm_names) - 1,
0,
0,
)
alternation_cnt = 0
alternations, same_arm_returns, alternate_arm_returns = {}, {}, {}
for i in list(itertools.permutations(arm_names)): alternations[i] = []
for i in arm_names: same_arm_returns[i] = []; alternate_arm_returns[i] = []
for i in list(itertools.permutations(arm_names)):
alternations[i] = []
for i in arm_names:
same_arm_returns[i] = []
alternate_arm_returns[i] = []
for i in range(stride, data.shape[0]):
current, priors = data[i], data[i - (stride):i]
current, priors = data[i], data[i - (stride) : i]
sequence = np.append(priors[:, 0].flatten(), current[0])
if np.unique(sequence).shape[0] == sequence.shape[0]:
alternations[tuple(sequence)].append(current[1])
Expand All @@ -491,41 +503,89 @@ def get_sliding_alternation(data: np.ndarray) -> Tuple[Union[Dict[int, List[int]
elif sequence[-1] in sequence[:-1]:
alternate_arm_returns[sequence[-1]].append(current[1])
alternate_arm_return_cnt += 1
return alternate_arm_returns, same_arm_returns, alternations, alternate_arm_return_cnt, same_arm_return_cnt, alternation_cnt
return (
alternate_arm_returns,
same_arm_returns,
alternations,
alternate_arm_return_cnt,
same_arm_return_cnt,
alternation_cnt,
)

check_instance(source=FeatureExtractionSupplemental.spontaneous_alternations.__name__, instance=data, accepted_types=(pd.DataFrame,))
check_valid_lst(data=arm_names, source=FeatureExtractionSupplemental.spontaneous_alternations.__name__, valid_dtypes=(str,), valid_values=data.columns)
check_str(name='center name', value=center_name, options=data.columns)
if center_name in arm_names: InvalidInputError(msg='One ROI is defined both as an arm ans as the center', source=FeatureExtractionSupplemental.spontaneous_alternations.__name__)
if len(list(set(arm_names))) != len(arm_names): InvalidInputError(msg=f'Each arm has to be unique but got {arm_names}', source=FeatureExtractionSupplemental.spontaneous_alternations.__name__)
check_instance(
source=FeatureExtractionSupplemental.spontaneous_alternations.__name__,
instance=data,
accepted_types=(pd.DataFrame,),
)
check_valid_lst(
data=arm_names,
source=FeatureExtractionSupplemental.spontaneous_alternations.__name__,
valid_dtypes=(str,),
valid_values=data.columns,
)
check_str(name="center name", value=center_name, options=data.columns)
if center_name in arm_names:
InvalidInputError(
msg="One ROI is defined both as an arm ans as the center",
source=FeatureExtractionSupplemental.spontaneous_alternations.__name__,
)
if len(list(set(arm_names))) != len(arm_names):
InvalidInputError(
msg=f"Each arm has to be unique but got {arm_names}",
source=FeatureExtractionSupplemental.spontaneous_alternations.__name__,
)
roi_names = arm_names + [center_name]
data_df = data[roi_names]
invalid_vals = list(set(np.unique(data_df.values.flatten())) - {0, 1})
if len(invalid_vals) > 0:
raise CountError(msg=f'When computing spontaneous alternation, ROI fields can only be 0 or 1. Found {invalid_vals}', source=FeatureExtractionSupplemental.spontaneous_alternations.__name__)
raise CountError(
msg=f"When computing spontaneous alternation, ROI fields can only be 0 or 1. Found {invalid_vals}",
source=FeatureExtractionSupplemental.spontaneous_alternations.__name__,
)
multiple_rois_frm_idx = np.argwhere(np.sum(data_df.values, axis=1) > 1)
if multiple_rois_frm_idx.shape[0] > 0:
raise CountError(msg=f'When computing spontaneous alternation, animals should only exist in <=1 ROIs in any one frame. In {multiple_rois_frm_idx.shape[0]} frames, the animal exist in more than one ROI.', source=FeatureExtractionSupplemental.spontaneous_alternations.__name__)
bout_df = detect_bouts(data_df=data_df, target_lst=data_df.columns, fps=1)[
['Event', 'Start_frame']].sort_values(['Start_frame']).reset_index(drop=True)
shifted_ = pd.concat([bout_df, bout_df.shift(-1).add_suffix('_shifted').reset_index(drop=True)], axis=1)[
['Event', 'Event_shifted']].values
raise CountError(
msg=f"When computing spontaneous alternation, animals should only exist in <=1 ROIs in any one frame. In {multiple_rois_frm_idx.shape[0]} frames, the animal exist in more than one ROI.",
source=FeatureExtractionSupplemental.spontaneous_alternations.__name__,
)
bout_df = (
detect_bouts(data_df=data_df, target_lst=data_df.columns, fps=1)[
["Event", "Start_frame"]
]
.sort_values(["Start_frame"])
.reset_index(drop=True)
)
shifted_ = pd.concat(
[bout_df, bout_df.shift(-1).add_suffix("_shifted").reset_index(drop=True)],
axis=1,
)[["Event", "Event_shifted"]].values
unique_counts = [len(list(set(list(x)))) for x in shifted_]
drop_idx = np.argwhere(np.array(unique_counts) == 1) + 1
bout_df = bout_df.drop(drop_idx.flatten(), axis=0).reset_index(drop=True)
arm_entry_sequence = bout_df[bout_df['Event'] != center_name].values
(alternate_arm_returns, same_arm_returns, alternations, alternate_arm_return_cnt, same_arm_return_cnt, alternation_cnt) = get_sliding_alternation(data=arm_entry_sequence)
pct_alternation = alternation_cnt / (arm_entry_sequence.shape[0] - (len(arm_names) - 1))

return {'pct_alternation': pct_alternation * 100,
'alternation_cnt': alternation_cnt,
'error_cnt': same_arm_return_cnt + alternate_arm_return_cnt,
'same_arm_returns_cnt': same_arm_return_cnt,
'alternate_arm_returns_cnt': alternate_arm_return_cnt,
'same_arm_returns_dict': same_arm_returns,
'alternate_arm_returns_dict': alternate_arm_returns,
'alternations_dict': alternations,
'arm_entry_sequence': arm_entry_sequence[:, 0].flatten()}
arm_entry_sequence = bout_df[bout_df["Event"] != center_name].values
(
alternate_arm_returns,
same_arm_returns,
alternations,
alternate_arm_return_cnt,
same_arm_return_cnt,
alternation_cnt,
) = get_sliding_alternation(data=arm_entry_sequence)
pct_alternation = alternation_cnt / (
arm_entry_sequence.shape[0] - (len(arm_names) - 1)
)

return {
"pct_alternation": pct_alternation * 100,
"alternation_cnt": alternation_cnt,
"error_cnt": same_arm_return_cnt + alternate_arm_return_cnt,
"same_arm_returns_cnt": same_arm_return_cnt,
"alternate_arm_returns_cnt": alternate_arm_return_cnt,
"same_arm_returns_dict": same_arm_returns,
"alternate_arm_returns_dict": alternate_arm_returns,
"alternations_dict": alternations,
"arm_entry_sequence": arm_entry_sequence[:, 0].flatten(),
}

@staticmethod
def find_path_loops(data: np.ndarray) -> Dict[Tuple[int], List[int]]:
Expand Down
73 changes: 55 additions & 18 deletions simba/mixins/geometry_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@
import imutils
import numpy as np
import pandas as pd
import shapely
from numba import njit, prange
from shapely.geometry import (GeometryCollection, LineString, MultiLineString,
MultiPoint, MultiPolygon, Point, Polygon)
from shapely.ops import linemerge, split, triangulate, unary_union
import shapely

try:
from typing_extensions import Literal
except:
Expand Down Expand Up @@ -804,7 +805,12 @@ def view_shapes(
>>> img = GeometryMixin.view_shapes(shapes=[line_1, polygon_1, multipolygon_1])
"""

check_valid_lst(data=shapes, source=GeometryMixin.view_shapes.__name__, valid_dtypes=(LineString, Polygon, MultiPolygon, MultiLineString, Point), min_len=1)
check_valid_lst(
data=shapes,
source=GeometryMixin.view_shapes.__name__,
valid_dtypes=(LineString, Polygon, MultiPolygon, MultiLineString, Point),
min_len=1,
)
max_vertices = find_max_vertices_coordinates(shapes=shapes, buffer=200)
if bg_img is None:
if bg_clr is None:
Expand Down Expand Up @@ -1168,8 +1174,8 @@ def multiframe_bodyparts_to_polygon(
raise_error=True,
)
parallel_offset = np.ceil(parallel_offset * pixels_per_mm)
if parallel_offset < 1: parallel_offset = 1

if parallel_offset < 1:
parallel_offset = 1

if core_cnt == -1:
core_cnt = find_core_cnt()[0]
Expand Down Expand Up @@ -3628,7 +3634,12 @@ def linear_frechet_distance(
return ca[n_p - 1, n_q - 1]

@staticmethod
def simba_roi_to_geometries(rectangles_df: pd.DataFrame, circles_df: pd.DataFrame, polygons_df: pd.DataFrame, color: Optional[bool] = False) -> dict:
def simba_roi_to_geometries(
rectangles_df: pd.DataFrame,
circles_df: pd.DataFrame,
polygons_df: pd.DataFrame,
color: Optional[bool] = False,
) -> dict:
"""
Convert SimBA dataframes holding ROI geometries to nested dictionary holding Shapley polygons.
Expand All @@ -3655,28 +3666,52 @@ def simba_roi_to_geometries(rectangles_df: pd.DataFrame, circles_df: pd.DataFram
accepted_types=(pd.DataFrame,),
)
for i in [rectangles_df, circles_df, polygons_df]:
check_that_column_exist(df=i, column_name=["Video", "Name", "Tags", 'Color BGR'], file_name="")
check_that_column_exist(
df=i, column_name=["Video", "Name", "Tags", "Color BGR"], file_name=""
)
results_roi, results_clr = {}, {}
for video_name in rectangles_df["Video"].unique():
if video_name not in results_roi.keys(): results_roi[video_name] = {}; results_clr[video_name] = {}
video_shapes = rectangles_df[["Tags", "Name", 'Color BGR']][rectangles_df["Video"] == video_name]
if video_name not in results_roi.keys():
results_roi[video_name] = {}
results_clr[video_name] = {}
video_shapes = rectangles_df[["Tags", "Name", "Color BGR"]][
rectangles_df["Video"] == video_name
]
for shape_name in video_shapes["Name"].unique():
shape_data = video_shapes[video_shapes["Name"] == shape_name].reset_index(drop=True)
tags, name = (list(shape_data["Tags"].values[0].values()), shape_data["Name"].values[0])
results_roi[video_name][name] = Polygon(Polygon(tags).convex_hull.exterior.coords)
shape_data = video_shapes[
video_shapes["Name"] == shape_name
].reset_index(drop=True)
tags, name = (
list(shape_data["Tags"].values[0].values()),
shape_data["Name"].values[0],
)
results_roi[video_name][name] = Polygon(
Polygon(tags).convex_hull.exterior.coords
)
results_clr[video_name][name] = shape_data["Color BGR"].values[0]
for video_name in polygons_df["Video"].unique():
if video_name not in results_roi.keys():
results_roi[video_name] = {}; results_clr[video_name] = {}
video_shapes = polygons_df[["Tags", "Name", 'Color BGR']][polygons_df["Video"] == video_name]
results_roi[video_name] = {}
results_clr[video_name] = {}
video_shapes = polygons_df[["Tags", "Name", "Color BGR"]][
polygons_df["Video"] == video_name
]
for shape_name in video_shapes["Name"].unique():
shape_data = video_shapes[ video_shapes["Name"] == shape_name].reset_index(drop=True)
tags, name = (list(shape_data["Tags"].values[0].values()), shape_data["Name"].values[0])
results_roi[video_name][name] = Polygon(Polygon(tags).convex_hull.exterior.coords)
shape_data = video_shapes[
video_shapes["Name"] == shape_name
].reset_index(drop=True)
tags, name = (
list(shape_data["Tags"].values[0].values()),
shape_data["Name"].values[0],
)
results_roi[video_name][name] = Polygon(
Polygon(tags).convex_hull.exterior.coords
)
results_clr[video_name][name] = shape_data["Color BGR"].values[0]
for video_name in circles_df["Video"].unique():
if video_name not in results_roi.keys():
results_roi[video_name] = {}; results_clr[video_name] = {}
results_roi[video_name] = {}
results_clr[video_name] = {}
video_shapes = circles_df[["Tags", "Name", "Color BGR"]][
circles_df["Video"] == video_name
]
Expand All @@ -3689,7 +3724,9 @@ def simba_roi_to_geometries(rectangles_df: pd.DataFrame, circles_df: pd.DataFram
shape_data["Name"].values[0],
shape_data["radius"].values[0],
)
results_roi[video_name][name] = Point(tags["Center tag"]).buffer(distance=radius)
results_roi[video_name][name] = Point(tags["Center tag"]).buffer(
distance=radius
)
results_clr[video_name][name] = shape_data["Color BGR"].values[0]
if not color:
return results_roi, None
Expand Down
9 changes: 7 additions & 2 deletions simba/mixins/pop_up_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
from tkinter import *
from tkinter import messagebox
from typing import Dict, List, Optional, Tuple, Union, Callable
from typing import Callable, Dict, List, Optional, Tuple, Union

import PIL.Image
from PIL import ImageTk
Expand Down Expand Up @@ -282,7 +282,12 @@ def create_time_bin_entry(self):
self.time_bin_entrybox.grid(row=0, column=0, sticky=NW)
self.time_bin_frm.grid(row=self.children_cnt_main(), column=0, sticky=NW)

def create_run_frm(self, run_function: Callable, title: Optional[str] = "RUN", btn_txt_clr: Optional[str] = 'black') -> None:
def create_run_frm(
self,
run_function: Callable,
title: Optional[str] = "RUN",
btn_txt_clr: Optional[str] = "black",
) -> None:
"""
Create a label frame with a single button with a specified callback.
Expand Down

0 comments on commit 80e5f48

Please sign in to comment.