# Prepare mitosis time series data

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from cellpose import models
from cellpose.io import imread
import glob
from pathlib import Path
from PIL import Image, ImageSequence
from tqdm import tqdm
import os
import os.path
# from livecell_tracker import segment
from livecell_tracker import core
from livecell_tracker.core import datasets
from livecell_tracker.core.datasets import LiveCellImageDataset, SingleImageDataset
from skimage import measure
from livecell_tracker.core import SingleCellTrajectory, SingleCellStatic

In [None]:
sample_json_dir = Path("./EBSS_starvation_24h_xy16_annotation")
sample_dataset_dir = sample_json_dir / "datasets"
class_subfolders = ["mitosis", "apoptosis", "normal"]
# sample_paths = glob.glob(str(sample_json_dir / "*.json"))

class_samples = {}
for subfolder in class_subfolders:
    class_samples[subfolder] = []
    sample_paths = glob.glob(str(sample_json_dir / subfolder / "*.json"))
    for sample_path in sample_paths:
        sample = SingleCellStatic.load_single_cells_json(sample_path)
        class_samples[subfolder].append(sample)

In [None]:
class_samples

Automatically prepare normal samples

require tracking done

In [None]:
# get all scs from class_samples not in normal class
exclude_scs = []
total_non_normal_samples = 0
for class_name, samples in class_samples.items():
    if class_name != "normal":
        for sample in samples:
            exclude_scs.extend(sample)
            total_non_normal_samples += 1

exclude_scs = set(exclude_scs)

load all scs

In [None]:
all_scs_json_path = "./datasets/test_scs_EBSS_starvation/tmp_corrected_scs.json"
all_scs = SingleCellStatic.load_single_cells_json(all_scs_json_path)

In [None]:
import json
from livecell_tracker.core.single_cell import SingleCellTrajectoryCollection
from livecell_tracker.track.sort_tracker_utils import (
    track_SORT_bbox_from_scs
)
# with open("./EBSS_starvation_24h_xy16_annotation/single_cell_trajectory_collection.json", "r") as file:
#     json_dict = json.load(file)
# sctc = SingleCellTrajectoryCollection().load_from_json_dict(json_dict)
sctc = track_SORT_bbox_from_scs(all_scs, raw_imgs=all_scs[0].img_dataset, min_hits=3, max_age=3)

In [None]:
objective_sample_num = total_non_normal_samples * 10

normal_frame_len_range = (3, 10)
counter = 0
normal_samples = []

max_trial_counter = 100000

while counter < objective_sample_num and max_trial_counter > 0:
    # randomly select a sct from sctc
    # generate a list of scs
    track_id = np.random.choice(list(sctc.track_id_to_trajectory.keys()))  
    sct = sctc.get_trajectory(track_id)
    # randomly select a length
    frame_len = np.random.randint(*normal_frame_len_range)
    # generate a sample
    times = list(sct.timeframe_to_single_cell.keys())
    times = sorted(times)
    if len(times) <= frame_len:
        continue
    start_idx = np.random.randint(0, len(times) - frame_len)
    start_time = times[start_idx]
    end_time = times[start_idx + frame_len - 1]

    sub_sct = sct.subsct(start_time, end_time)

    is_some_sc_in_exclude_scs = False
    for time, sc in sub_sct.timeframe_to_single_cell.items():
        if sc in exclude_scs:
            is_some_sc_in_exclude_scs = True
            break
    if is_some_sc_in_exclude_scs:
        continue
    
    new_sample = []
    for time, sc in sub_sct.timeframe_to_single_cell.items():
        new_sample.append(sc)
    normal_samples.append(new_sample)
    counter += 1
    max_trial_counter -= 1

normal_samples[:2]

In [None]:
class_samples["normal"].extend(normal_samples)

## Prepare videos and annotations for MMDetection

In [None]:
classes = class_samples.keys()
classes

In [None]:
from livecell_tracker.core.utils import gray_img_to_rgb
from livecell_tracker.preprocess.utils import normalize_img_to_uint8

In [None]:
[len(sample) for sample in normal_samples]

In [None]:
from livecell_tracker.track.classify_utils import video_frames_and_masks_from_sample

In [None]:
from typing import List
import cv2
import numpy as np
import pandas as pd

def gen_mp4_from_frames(video_frames, output_file, fps):
    # Define the output video file name and properties
    frame_size = video_frames[0].shape[:2][::-1]  # reverse the order of width and height

    # Create a VideoWriter object
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(str(output_file), fourcc, fps, frame_size)
    # Write each frame to the output video
    for frame in video_frames:
        out.write(frame)
    out.release()

def gen_samples_mp4s(sc_samples: List[List[SingleCellStatic]], class_label, output_dir, fps = 1):
    
    res_paths = []
    for i, sample in enumerate(sc_samples):
        output_file = output_dir / (f'{class_label}_{i}.mp4')
        mask_output_file = output_dir / (f'{class_label}_{i}_mask.mp4')
        
        print("len sample: ", len(sample))
        # record video file path and class label
        video_frames, video_frame_masks = video_frames_and_masks_from_sample(sample)
        print("len video_frames: ", len(video_frames))
        print("len masks video: ", len(video_frame_masks))

        gen_mp4_from_frames(video_frames, output_file, fps=fps)
        gen_mp4_from_frames(video_frame_masks, mask_output_file, fps=fps)
        res_paths.append(output_file)
    return res_paths

class_labels = ['mitosis', 'apoptosis', 'normal']

class_label = "mitosis"

csv_data_list = []
for class_label in class_labels:
    output_dir = Path('notebook_results/mmaction_train_data') / "videos"
    output_dir.mkdir(exist_ok=True, parents=True)
    video_frames_samples = class_samples[class_label]
    res_paths = gen_samples_mp4s(video_frames_samples, class_label, output_dir)

    # path, label_index
    class_label_index = class_labels.index(class_label)

    # only add filename, without the full path
    # csv_data_list.extend([(str(path), class_label_index) for path in res_paths])
    csv_data_list.extend([(str(path.name), class_label_index) for path in res_paths])


data_df_path = 'notebook_results/mmaction_train_data/all_data.csv'
train_df_path = 'notebook_results/mmaction_train_data/train_data.csv'
test_df_path = 'notebook_results/mmaction_train_data/test_data.csv'
df = pd.DataFrame(csv_data_list, columns=['video_path', 'label_index'])
df.to_csv(data_df_path, index=False, header=False, sep=' ')



In [None]:
from sklearn.model_selection import train_test_split

# split train and test from df
train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)
train_df.to_csv(train_df_path, index=False, header=False, sep=' ')
test_df.to_csv(test_df_path, index=False, header=False, sep=' ')
