In [1]:
%matplotlib qt
from skimage.measure import label, regionprops, regionprops_table
import numpy as np
import tifffile
import pandas as pd
import napari
import time
from tqdm import tqdm
from typing import List, Dict, Any

from particle_tracking.utils import get_statck_properties, get_tracks
from dataclasses import dataclass, field, fields

import matplotlib.pyplot as plt
import particle_tracking.stepfinder as sf
# https://www.science.org/doi/10.1126/science.abd9944
# https://github.com/tobiasjj/stepfinder/tree/master
# https://www.sciencedirect.com/science/article/pii/S2001037021002944
# https://www.sciencedirect.com/science/article/pii/016502709190118J 

def classFromDict(className, argDict):
    fieldSet = {f.name for f in fields(className) if f.init}
    filteredArgDict = {k : v for k, v in argDict.items() if k in fieldSet}
    return className(**filteredArgDict)

@dataclass(repr=True)
class Point:
    x:int
    y:int
    z:int = None
    area:float = 0
    bbox:tuple = None
    intensity_max:float = 0
    intensity_mean:float = 0
    intensity_min:float = 0
    label:int = 0
    frame:int = 0
    other:Dict[Any, Any] = None

    def __eq__(self, other):
        return (self.x, self.y, self.z) == (other.x, other.y, other.z)
    
    def __ne__(self, other):
        return not self.__eq__(other)

    def to_time_tuple(self):
        # frame/time_point, (z), y, x
        return [self.frame, self.y, self.x] if self.z is None else [self.frame, self.z, self.y, self.x]
    
    def __getitem__(self, item):
        return getattr(self, item)
    # def __repr__(self) -> str:
    #     return f'X:{self.x}, Y:{self.y}, Z:{self.z} -> {self.intensity_mean}'
    # def __lt__(self, other):
    #     return (self.x, self.y, self.z) < (other.x, other.y, other.z)
    
    # def __le__(self, other):
    #     return (self.x, self.y, self.z) <= (other.x, other.y, other.z)
    
    # def __gt__(self, other):
    #     return (self.x, self.y, self.z) > (other.x, other.y, other.z)
    
    # def __ge__(self, other):
    #     return (self.x, self.y, self.z) >= (other.x, other.y, other.z)

@dataclass
class Track:
    points:List[Point] = field(default_factory=list)
    id: int = 0
    id_key:str = field(default_factory=str)
    data_frame :pd.DataFrame = field(default_factory=pd.DataFrame)

    def __len__(self):
        return len(self.points)
    
    def __repr__(self) -> str:
        return f'id:{self.id}, len: {len(self.points)}'
    
    def init_by_dataframe(self, df:pd.DataFrame, id_key:str):
        df['label'] = df[id_key]
        self.id_key =id_key
        self.data_frame = df
        for ti, r in df.iterrows():
            self.id = int(r[id_key])
            point = classFromDict(Point, r.to_dict())
            self.points.append(point)
    def to_points_list(self):
        return list((p.to_time_tuple() for p in self.points))
    
    def to_list(self):
        # track_id, frame/time_point, (z), y, x
        return list(([self.id, ] + p.to_time_tuple() for p in self.points))
    
    def to_list_by_key(self, key):
        return list((p[key] for p in self.points))

@dataclass
class PointsPool:
    points:List[Point] = field(default_factory=list)

    def __len__(self):
        return len(self.points)

    def find_by_frame(self, frame):
        for p in self.points:
            if p.frame == frame:
                return p
        return None

In [2]:
masks_path = "D:/Data/Sudipta/Arpan/position_test/mask.tif"
images_path = "D:/Data/Sudipta/Arpan/send-1.tif"
# print(mask_path)
images = tifffile.imread(images_path)
masks = tifffile.imread(masks_path)

assert images.shape == masks.shape
st = time.time()
main_pd_frame = get_statck_properties(masks=masks, images=images, show_progress=False)

with pd.option_context('display.max_rows', 200, 'display.max_columns', None):  # more options can be specified also
    print(main_pd_frame)


# trackpy.quiet(True)
# tracked = trackpy.link(f=main_pd_frame,search_range=2, memory=0)
tracked = get_tracks(main_pd_frame)
track_ids = tracked['particle'].unique()
accepted_track_count = 0
tracks = []
points = []
track_objs = []
for track_id in track_ids:
    if len(list(tracked[tracked['particle'] == track_id]['frame'])) >= 200:
        accepted_track_count +=1
        track = Track()
        track.init_by_dataframe(tracked[tracked['particle'] == track_id].copy().reset_index(drop=True), 'particle')
        # for ti, r in track.iterrows():
        track_objs.append(track)
        tracks += track.to_list()
        points += track.to_points_list()
print(f'totlat time require:  {time.time() - st} sec')

# print(f"Total tracks :  {accepted_track_count}")
# print(f"Total number of tracks : {len(tracks)}")
# print(f"tracks[0] : {tracks[0]}, type : {type(tracks[0])}")


# print(f"Total number of points : {len(points)}")
# print(f"point[0] : {points[0]}, type : {type(points[0])}")

# print(f"Intensity tracks[0] : {track_objs[0].to_list_by_key('intensity_mean')}")
intensity_0 = track_objs[0].to_list_by_key('intensity_mean')
x_0 = list(range(len(intensity_0)))

       label           y        x  intensity_mean  intensity_max  \
0          1    0.600000   74.200     1158.400000         1219.0   
1          2    1.000000  180.000     1024.000000         1024.0   
2          3    4.285714  109.000     1150.714286         1308.0   
3          4    5.000000  143.000     1097.000000         1097.0   
4          5    6.875000    8.875     1123.125000         1178.0   
...      ...         ...      ...             ...            ...   
26321      8   72.000000  159.000     1105.444444         1325.0   
26322      9   74.000000  103.000     1106.777778         1168.0   
26323     10   82.875000   82.875     1116.750000         1206.0   
26324     11   86.500000   65.000     1176.166667         1250.0   
26325     12  130.000000  149.500     1118.000000         1158.0   

       intensity_min  area  frame  
0             1117.0     5      0  
1             1024.0     1      0  
2             1092.0     7      0  
3             1097.0     1      0  
4  

In [14]:
data = np.array(intensity_0)
resolution = 10 # in Hz

# Set parameters for filtering the data and finding steps
filter_min_t = 0.005  # None or s
filter_max_t = 0.050  # None or s
expected_min_step_size = 2000.0  # in values of data
# Set additional parameters for filtering the data
filter_time =  None  # None or s
filter_number = 40  # None or number
edginess = 1  # float
# Set additional parameters for finding the steps
expected_min_dwell_t = None  # None or s
step_size_threshold = None  # None (equal to 'adapt'), 'constant', or in values of data

filter_max_t = filter_max_t or filter_time
print(filter_max_t)
try:
    filter_min_t = filter_min_t or min(filter_time, filter_max_t)
except:
    print('You have to give at least one `filter_time`')

print(filter_min_t)
windows = sf.log_spaced_time_windows(filter_min_t, filter_max_t, resolution,
                                  filter_number)
print(windows)
if filter_time is not None:
    print("here")
    window_edge = max(int(np.round(filter_time * resolution)), 1)
    windows = np.unique(np.r_[window_edge, windows])
    i_window = np.where(windows == window_edge)[0][0]
    print(window_edge, windows, i_window)
# fbnl_filter = sf.filter_fbnl(intensity_0, 10, 2)
# step_finder_result \
#     = sf.filter_find_analyse_steps(intensity_0, resolution, filter_time, filter_min_t, filter_max_t,
#                                 filter_number, edginess,
#                                 expected_min_step_size, expected_min_dwell_t,
#                                 step_size_threshold, pad_data=False,
#                                 verbose=True, plot=False)
windows_var = windows
steps_number_pre = []
steps_number = []
aSNRs = []
mSNRs = []
STDs = []

# mean of data_filtered of several banks of predictors (windows)
data_filtered_mean = np.zeros_like(data)
step_size_mean = np.zeros_like(data)
noise_mean = np.zeros_like(data)
for i, (window, window_var) in enumerate(zip(windows, windows_var)):
    print(i, window, window_var)
fbnl_filter = sf.filter_fbnl(data, resolution, window=10,
                                  window_var=None, p=edginess,
                                  pad_data=False)

step_finder_result = sf.find_and_analyse_steps(fbnl_filter, verbose=False)
# print(step_finder_result.steps.number)
# print(step_finder_result.steps_pre)
print(step_finder_result.steps_pre.bounds)
y_steps = step_finder_result.steps_pre.bounds.flatten()
print(y_steps)
print(len(data))
print(np.max(data))
y_data = np.zeros(shape=data.shape)
y_data[:] = np.min(data)
y_data[y_steps] = data[y_steps]
# print(y_data)

print(step_finder_result.steps.number)
y_steps_post = step_finder_result.steps.bounds.flatten()
y_data_post = np.zeros(shape=data.shape)
y_data_post[:] = np.min(data)
y_data_post[y_steps_post] = data[y_steps_post]


0.05
0.005
[0]
0 0 0
[[ 19  26]
 [ 51  52]
 [ 64  66]
 [ 75  77]
 [127 130]
 [142 149]
 [158 160]
 [168 175]
 [184 190]
 [211 222]
 [228 232]
 [247 252]
 [257 265]
 [302 303]
 [333 341]
 [364 367]
 [418 425]
 [457 460]
 [470 474]]
[ 19  26  51  52  64  66  75  77 127 130 142 149 158 160 168 175 184 190
 211 222 228 232 247 252 257 265 302 303 333 341 364 367 418 425 457 460
 470 474]
493
1271.6666666666667
2


In [13]:
import matplotlib.lines as mlines
def newline(p1, p2, color, linestyle='-', linewidth=1):
    ax = plt.gca()
    xmin, xmax = ax.get_xbound()

    if(p2[0] == p1[0]):
        xmin = xmax = p1[0]
        ymin, ymax = ax.get_ybound()
    else:
        ymax = p1[1]+(p2[1]-p1[1])/(p2[0]-p1[0])*(xmax-p1[0])
        ymin = p1[1]+(p2[1]-p1[1])/(p2[0]-p1[0])*(xmin-p1[0])

    l = mlines.Line2D([xmin,xmax], [ymin,ymax], color=color, linestyle=linestyle, linewidth=linewidth)
    ax.add_line(l)
    return l

plt.plot(x_0, fbnl_filter.data, label = "Intensity")
plt.plot(x_0, fbnl_filter.data_filtered, label = "Filter CKF" )
for pos in y_steps_post:
    x1 = x2 = pos
    y1 = 0
    y2 = np.max(data) -1
    newline([x1,y1], [x2,y2], color='green', linestyle='--', linewidth=5)
for pos in y_steps:
    x1 = x2 = pos
    y1 = 0
    y2 = np.max(data) -1
    newline([x1,y1], [x2,y2], 'red')
# plt.plot(x_0, y_data, label = "Step breaks pre")
# plt.plot(x_0, y_data_post, label = "Step breaks post")
# plt.legend()
# plt.show()