### **Single particle cropping from TIRF images**

In [None]:
import os
import numpy as np
import pandas as pd
import tifffile as tiff
from numpy import savez_compressed

# File paths
csv_file_path = r'Imagej_plugin_particle_tracking_output.csv'
tiff_path = r'TIRF_image.tif'
output_dir = r'output_folder'

# Load CSV data
df = pd.read_csv(csv_file_path, encoding='cp949')
df = df.iloc[3:]
x = df.iloc[:, 3].to_numpy().astype(float)
y = df.iloc[:, 4].to_numpy().astype(float)
Tr = df.iloc[:, 1].to_numpy().astype(float)
t = df.iloc[:, 2].to_numpy().astype(float)

table = pd.DataFrame({'Tr': Tr, 'x': x, 'y': y, 't': t}).dropna(subset=['Tr'])

# Load TIFF stack
stack = tiff.imread(tiff_path)

# Create output directory
os.makedirs(output_dir, exist_ok=True)

# Extract and save 7x7 crops for valid trajectories
for tr in table['Tr'].unique():
    traj = table[table['Tr'] == tr]

    if len(traj) < 10:
        continue

    if (traj['x'] < 5).any() or (traj['y'] < 5).any() or (traj['x'] > 508).any() or (traj['y'] > 508).any():
        continue

    if (traj['t'] == 0).any():  # Only save trajectories with t==0 frame
        crop_stack = []
        for j in range(1, len(traj)):
            xx = int(traj.x.iloc[j])
            yy = int(traj.y.iloc[j])
            crop = stack[j, yy-3:yy+4, xx-3:xx+4]
            crop_stack.append(crop)

        if crop_stack:
            output_filename = f"Tr_{int(tr)}_x{xx}_y{yy}_t{len(traj)}.tif"
            output_path = os.path.join(output_dir, output_filename)
            tiff.imwrite(output_path, crop_stack)

# Search for saved .tif crop files
file_list = [
    fname for fname in os.listdir(output_dir)
    if fname.endswith('.tif')
]

# Process each .tif file and save as .npz
for file in file_list:
    file_path = os.path.join(output_dir, file)
    crop_stack = tiff.imread(file_path)

    if crop_stack.shape[0] < 10:
        continue

    # Check if edges are non-zero (quality control)
    if all([
        np.any(crop_stack[:, 0, 0] != 0),
        np.any(crop_stack[:, 6, 0] != 0),
        np.any(crop_stack[:, 0, 6] != 0),
        np.any(crop_stack[:, 6, 6] != 0)
    ]):
        # Reshape to 4D: (1, 7, 7, time)
        crop_stack_4d = crop_stack[np.newaxis, :, :, :]
        y_label = np.zeros((1,))  # Label = 0
        base_name = os.path.splitext(file)[0]
        save_path = os.path.join(output_dir, base_name + '.npz')
        savez_compressed(save_path, crop_stack_4d, y_label)


### **Train set & test set split**

In [None]:
import os
import shutil
import random

# Root directory containing all subfolders with npz files
root_dir = r'Root directory'

# Output directories for train and test sets
train_dir = os.path.join(root_dir, 'training')
test_dir = os.path.join(root_dir, 'test')
os.makedirs(train_dir, exist_ok=True)
os.makedirs(test_dir, exist_ok=True)

# Find all folders ending with '-crop'
crop_folders = [f.path for f in os.scandir(root_dir) if f.is_dir() and f.name.endswith('-crop')]
print(f"\n📁 Found {len(crop_folders)} crop folders.")

total_files_copied = 0

for crop_folder_path in crop_folders:
    crop_folder_name = os.path.basename(crop_folder_path)
    npz_files = [f for f in os.listdir(crop_folder_path) if f.endswith('.npz')]

    if not npz_files:
        print(f"  ⚠️ No .npz files found in '{crop_folder_name}'")
        continue

    # Shuffle and split the files into training and testing sets (80/20 split)
    random.shuffle(npz_files)
    split_index = int(len(npz_files) * 0.8)
    train_files = npz_files[:split_index]
    test_files = npz_files[split_index:]

    # Helper function to copy files and track count
    def copy_files(file_list, target_dir, label):
        nonlocal total_files_copied
        for fname in file_list:
            src = os.path.join(crop_folder_path, fname)
            dst_name = f"{crop_folder_name}__{fname}"
            dst = os.path.join(target_dir, dst_name)
            try:
                shutil.copy2(src, dst)
                total_files_copied += 1
            except Exception as e:
                print(f"  ❌ Error copying {fname}: {e}")

    # Copy training and test files
    copy_files(train_files, train_dir, "training")
    copy_files(test_files, test_dir, "test")

    print(f"  ✅ {crop_folder_name}: copied {len(train_files)} to training, {len(test_files)} to test")

# Final summary
print(f"\n🎉 A total of {total_files_copied} .npz files were copied successfully.")
print(f"📂 training folder: {len(os.listdir(train_dir))} files")
print(f"📂 test folder: {len(os.listdir(test_dir))} files")


### **Data loading (classA)**

In [None]:
import os
import numpy as np
import random

#training/test folder paths
base_path = r'root_path'
training_path = os.path.join(base_path, 'training')

def load_npz_from_folder(folder_path, use_fraction=1.0, file_fraction=1.0, folder_label=""):
    """
    Loads .npz files from a folder, applies sliding window, and returns selected chunks.

    Args:
        folder_path (str): Path to the folder containing .npz files.
        use_fraction (float): Fraction of sliding windows to use per file.
        file_fraction (float): Fraction of files to use from the folder.
        folder_label (str): Label for logging.

    Returns:
        np.ndarray: Concatenated array of selected windows, or None if nothing usable.
    """
    all_chunks = []
    all_files = [f for f in os.listdir(folder_path) if f.endswith('.npz')]

    if not all_files:
        print(f"⚠️ No .npz files found in '{folder_label}' folder.")
        return None

    num_files_to_use = max(1, int(len(all_files) * file_fraction))
    selected_files = random.sample(all_files, num_files_to_use)

    print(f"📁 {folder_label}: using {num_files_to_use} of {len(all_files)} files.")

    for filename in selected_files:
        file_path = os.path.join(folder_path, filename)

        try:
            arr = np.load(file_path)['arr_0']  # Expected shape: (1, 7, 7, N)

            if arr.shape[0] != 1 or arr.shape[1:3] != (7, 7):
                print(f"  ⚠️ Skipping malformed file: {filename}")
                continue

            arr = arr[0]  # Shape becomes: (7, 7, N)
            N = arr.shape[2]

            if N < 10:
                print(f"  ⚠️ Skipping short file: {filename} (N={N})")
                continue

            # Generate sliding windows of size 10
            num_windows = N - 10 + 1
            windows = np.stack([arr[:, :, i:i+10] for i in range(num_windows)])

            # Subsample the windows
            select_n = max(1, int(num_windows * use_fraction))
            selected_indices = sorted(random.sample(range(num_windows), select_n))
            selected_windows = windows[selected_indices]

            all_chunks.append(selected_windows)

        except Exception as e:
            print(f"  ❌ Error reading '{file_path}': {e}")

    if all_chunks:
        final_array = np.concatenate(all_chunks, axis=0)
        print(f"✅ {folder_label}: total loaded shape = {final_array.shape}")
        return final_array
    else:
        print(f"⚠️ No usable data found in '{folder_label}' folder.")
        return None


# Define fractions to use
train_file_fraction = 1.0  # Use 100% of training files (As long as one class is not overwhelmingly dominant)

# Load training data
print("\n📂 Processing training data...")
classA_training_data = load_npz_from_folder(
    training_path,
    use_fraction=0.7, #(40%~80% of sliding windows per file, percentage varies depend on test accuracy due to overfitting issue)
    file_fraction=train_file_fraction,
    folder_label="training"
)

# Final check
if classA_training_data is not None:
    print("✅ classA_training_data.shape =", classA_training_data.shape)


### **Data loading (classB)**

In [None]:
import os
import numpy as np
import random

#training/test folder paths
base_path = r'root_path'
training_path = os.path.join(base_path, 'training')

def load_npz_from_folder(folder_path, use_fraction=1.0, file_fraction=1.0, folder_label=""):
    """
    Loads .npz files from a folder, applies sliding window, and returns selected chunks.

    Args:
        folder_path (str): Path to the folder containing .npz files.
        use_fraction (float): Fraction of sliding windows to use per file.
        file_fraction (float): Fraction of files to use from the folder.
        folder_label (str): Label for logging.

    Returns:
        np.ndarray: Concatenated array of selected windows, or None if nothing usable.
    """
    all_chunks = []
    all_files = [f for f in os.listdir(folder_path) if f.endswith('.npz')]

    if not all_files:
        print(f"⚠️ No .npz files found in '{folder_label}' folder.")
        return None

    num_files_to_use = max(1, int(len(all_files) * file_fraction))
    selected_files = random.sample(all_files, num_files_to_use)

    print(f"📁 {folder_label}: using {num_files_to_use} of {len(all_files)} files.")

    for filename in selected_files:
        file_path = os.path.join(folder_path, filename)

        try:
            arr = np.load(file_path)['arr_0']  # Expected shape: (1, 7, 7, N)

            if arr.shape[0] != 1 or arr.shape[1:3] != (7, 7):
                print(f"  ⚠️ Skipping malformed file: {filename}")
                continue

            arr = arr[0]  # Shape becomes: (7, 7, N)
            N = arr.shape[2]

            if N < 10:
                print(f"  ⚠️ Skipping short file: {filename} (N={N})")
                continue

            # Generate sliding windows of size 10
            num_windows = N - 10 + 1
            windows = np.stack([arr[:, :, i:i+10] for i in range(num_windows)])

            # Subsample the windows
            select_n = max(1, int(num_windows * use_fraction))
            selected_indices = sorted(random.sample(range(num_windows), select_n))
            selected_windows = windows[selected_indices]

            all_chunks.append(selected_windows)

        except Exception as e:
            print(f"  ❌ Error reading '{file_path}': {e}")

    if all_chunks:
        final_array = np.concatenate(all_chunks, axis=0)
        print(f"✅ {folder_label}: total loaded shape = {final_array.shape}")
        return final_array
    else:
        print(f"⚠️ No usable data found in '{folder_label}' folder.")
        return None


# Define fractions to use
train_file_fraction = 1.0  # Use 100% of training files (As long as one class is not overwhelmingly dominant)

# Load training data
print("\n📂 Processing training data...")
classB_training_data = load_npz_from_folder(
    training_path,
    use_fraction=0.7, #(40%~80% of sliding windows per file, percentage varies depend on test accuracy due to overfitting issue)
    file_fraction=train_file_fraction,
    folder_label="training"
)

# Final check
if classB_training_data is not None:
    print("✅ classB_training_data.shape =", classB_training_data.shape)


### **Data standardization**

In [None]:
import numpy as np
from sklearn.model_selection import train_test_split

# Combine class A and B data
combined_X_3d = np.append(classA_training_data, classB_training_data, axis=0)

# Create labels: 0 for class A, 1 for class B
classA_labels = np.full((classA_training_data.shape[0],), 0)
classB_labels = np.full((classB_training_data.shape[0],), 1)
combined_y_3d = np.append(classA_labels, classB_labels, axis=0)

# Copy for preprocessing
spatial_X = np.copy(combined_X_3d)
spatial_y = np.copy(combined_y_3d)

# Normalize each sample (Z-score per 3D tensor)
mean_vals = np.mean(spatial_X, axis=(1, 2, 3), keepdims=True)
std_vals = np.std(spatial_X, axis=(1, 2, 3), keepdims=True)
spatial_X = (spatial_X - mean_vals) / (std_vals + 1e-8)  # Add epsilon to avoid division by zero

# Split into train/test sets
X_train_spa, X_test_spa, y_train, y_test = train_test_split(
    spatial_X, spatial_y, test_size=0.2, random_state=42
)

# Expand dims if needed for CNN (e.g., (N, 7, 7, 10, 1))
if X_train_spa.ndim == 4:
    X_train_spa = X_train_spa[..., np.newaxis]
if X_test_spa.ndim == 4:
    X_test_spa = X_test_spa[..., np.newaxis]

### **Model training**

In [None]:
import os
import tensorflow as tf
from tensorflow.keras import layers, models, regularizers
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping
import matplotlib.pyplot as plt

# Hyperparameters
BATCH_SIZE = 32
LEARNING_RATE = 1e-5
EPOCHS = 30

# Save path
model_dir = '/content/gdrive/MyDrive/gap-nick'
os.makedirs(model_dir, exist_ok=True)
model_name = f'best_model_training_bs{BATCH_SIZE}_lr{LEARNING_RATE:.0e}.keras'
model_path = os.path.join(model_dir, model_name)

# Define the model
model = models.Sequential([
    layers.Input(shape=(7, 7, 10, 1)),
    layers.Conv3D(32, (2, 2, 1), activation='relu'),
    layers.BatchNormalization(),
    layers.Dropout(0.1),

    layers.Conv3D(16, (3, 2, 1), activation='relu'),
    layers.BatchNormalization(),
    #layers.Dropout(0.2),

    layers.Conv3D(32, (2, 2, 10), activation='relu'),
    layers.BatchNormalization(),
    layers.Dropout(0.1),

    layers.Flatten(),
    layers.Dense(16, activation='relu', kernel_regularizer=regularizers.l2(0.05)),
    layers.Dense(2, activation='softmax')
])

model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False),
    metrics=['accuracy']
)

# Callbacks
checkpoint = ModelCheckpoint(
    model_path,
    monitor='val_accuracy',
    save_best_only=True,
    mode='max',
    verbose=1
)

reduce_lr = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.5,
    patience=3,
    min_lr=1e-6,
    verbose=1
)

early_stop = EarlyStopping(
    monitor='val_loss',
    patience=6,
    restore_best_weights=True,
    verbose=1
)

# Train the model
history = model.fit(
    X_train_spa, y_train,
    validation_data=(X_test_spa, y_test),
    batch_size=BATCH_SIZE,
    epochs=EPOCHS,
    callbacks=[checkpoint, reduce_lr, early_stop],
    verbose=2
)

# Plot training history
def plot_history(history):
    plt.figure(figsize=(12, 4))

    # Accuracy
    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'], label='Train Accuracy')
    plt.plot(history.history['val_accuracy'], label='Val Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.title('Model Accuracy')
    plt.legend()
    plt.grid(True)

    # Loss
    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'], label='Train Loss')
    plt.plot(history.history['val_loss'], label='Val Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Model Loss')
    plt.legend()
    plt.grid(True)

    plt.tight_layout()
    plt.show()

plot_history(history)


### **Model loading**

In [None]:
from tensorflow.keras.models import load_model

# Model loading
model = load_model('model.keras')

### **Model validation**

In [None]:
import os
import numpy as np
import random
import collections

# Define your trained model before this script
# model = ...

# Root directory to search for test folders
root_dir = r'root_folder'

# Find folders ending with 'test'
crop_folders = [f.path for f in os.scandir(root_dir) if f.is_dir() and f.name.endswith('test')]
print(f'\n📁 Found {len(crop_folders)} test folders.\n')

summary = []

for crop_dir in crop_folders:
    folder_name = os.path.basename(crop_dir)
    print(f"\n🔍 Processing folder: {folder_name}")

    file_list = [f for f in os.listdir(crop_dir) if f.endswith('.npz')]
    if not file_list:
        print("  ⚠️ No .npz files found.")
        continue

    # Randomly select files (change ratio if needed)
    selected_files = random.sample(file_list, int(len(file_list) * 1.0))

    max_frame = 6000
    frame = 10
    results = np.full((max_frame, len(selected_files)), 4)  # 4 = invalid/uninitialized

    for idx, filename in enumerate(selected_files):
        file_path = os.path.join(crop_dir, filename)
        try:
            npz_data = np.load(file_path)
            raw_data = npz_data['arr_0']
        except Exception as e:
            print(f"  ❌ Error loading {filename}: {e}")
            continue

        # Reshape to (7, 7, time)
        try:
            reshaped = np.reshape(raw_data, (7, 7, -1))
        except Exception as e:
            print(f"  ❌ Reshape error in {filename}: {e}")
            continue

        # Sliding window with shape (num_windows, 7, 7, 10)
        try:
            windows = np.lib.stride_tricks.sliding_window_view(reshaped, window_shape=frame, axis=2)
            windowed_data = np.transpose(windows, axes=[2, 0, 1, 3])  # → (num_samples, 7, 7, 10)
        except Exception:
            continue

        if windowed_data.shape[0] == 0:
            continue

        # Normalize each window individually
        normalized = np.empty_like(windowed_data)
        for j in range(len(windowed_data)):
            sample = windowed_data[j]
            mean = np.mean(sample)
            std = np.std(sample) + 1e-8
            normalized[j] = (sample - mean) / std

        # Expand dims for 3D CNN: (batch, 7, 7, 10, 1)
        normalized = normalized[..., np.newaxis]

        try:
            preds = model.predict(normalized, verbose=0)
            pred_labels = np.argmax(preds, axis=1)
            results[:len(pred_labels), idx] = pred_labels
        except Exception as e:
            print(f"  ❌ Prediction error on {filename}: {e}")
            continue

    # Post-processing predictions
    count0, count1 = 0, 0
    for i in range(results.shape[1]):
        predictions = results[:, i]
        valid_preds = predictions[predictions != 4]
        if len(valid_preds) == 0:
            continue
        most_common, _ = collections.Counter(valid_preds).most_common(1)[0]
        if most_common == 0:
            count0 += 1
        elif most_common == 1:
            count1 += 1

    print(f"  ✅ Processed files: {len(selected_files)}")
    print(f"     → Predicted as class 0: {count0}")
    print(f"     → Predicted as class 1: {count1}")

    summary.append({
        'folder': folder_name,
        'total': len(selected_files),
        'class_0': count0,
        'class_1': count1
    })

# Final summary
print("\n📊 Summary of all predictions:")
for item in summary:
    print(f"📁 {item['folder']} — Total: {item['total']}, Class 0: {item['class_0']}, Class 1: {item['class_1']}")


In [None]:
import os
import numpy as np
import random
import collections

# Define your trained model before this script
# model = ...

# Root directory to search for test folders
root_dir = r'root_folder'

# Find folders ending with 'test'
crop_folders = [f.path for f in os.scandir(root_dir) if f.is_dir() and f.name.endswith('test')]
print(f'\n📁 Found {len(crop_folders)} test folders.\n')

summary = []

for crop_dir in crop_folders:
    folder_name = os.path.basename(crop_dir)
    print(f"\n🔍 Processing folder: {folder_name}")

    file_list = [f for f in os.listdir(crop_dir) if f.endswith('.npz')]
    if not file_list:
        print("  ⚠️ No .npz files found.")
        continue

    # Randomly select files (change ratio if needed)
    selected_files = random.sample(file_list, int(len(file_list) * 1.0))

    max_frame = 6000
    frame = 10
    results = np.full((max_frame, len(selected_files)), 4)  # 4 = invalid/uninitialized

    for idx, filename in enumerate(selected_files):
        file_path = os.path.join(crop_dir, filename)
        try:
            npz_data = np.load(file_path)
            raw_data = npz_data['arr_0']
        except Exception as e:
            print(f"  ❌ Error loading {filename}: {e}")
            continue

        # Reshape to (7, 7, time)
        try:
            reshaped = np.reshape(raw_data, (7, 7, -1))
        except Exception as e:
            print(f"  ❌ Reshape error in {filename}: {e}")
            continue

        # Sliding window with shape (num_windows, 7, 7, 10)
        try:
            windows = np.lib.stride_tricks.sliding_window_view(reshaped, window_shape=frame, axis=2)
            windowed_data = np.transpose(windows, axes=[2, 0, 1, 3])  # → (num_samples, 7, 7, 10)
        except Exception:
            continue

        if windowed_data.shape[0] == 0:
            continue

        # Normalize each window individually
        normalized = np.empty_like(windowed_data)
        for j in range(len(windowed_data)):
            sample = windowed_data[j]
            mean = np.mean(sample)
            std = np.std(sample) + 1e-8
            normalized[j] = (sample - mean) / std

        # Expand dims for 3D CNN: (batch, 7, 7, 10, 1)
        normalized = normalized[..., np.newaxis]

        try:
            preds = model.predict(normalized, verbose=0)
            pred_labels = np.argmax(preds, axis=1)
            results[:len(pred_labels), idx] = pred_labels
        except Exception as e:
            print(f"  ❌ Prediction error on {filename}: {e}")
            continue

    # Post-processing predictions
    count0, count1 = 0, 0
    for i in range(results.shape[1]):
        predictions = results[:, i]
        valid_preds = predictions[predictions != 4]
        if len(valid_preds) == 0:
            continue
        most_common, _ = collections.Counter(valid_preds).most_common(1)[0]
        if most_common == 0:
            count0 += 1
        elif most_common == 1:
            count1 += 1

    print(f"  ✅ Processed files: {len(selected_files)}")
    print(f"     → Predicted as class 0: {count0}")
    print(f"     → Predicted as class 1: {count1}")

    summary.append({
        'folder': folder_name,
        'total': len(selected_files),
        'class_0': count0,
        'class_1': count1
    })

# Final summary
print("\n📊 Summary of all predictions:")
for item in summary:
    print(f"📁 {item['folder']} — Total: {item['total']}, Class 0: {item['class_0']}, Class 1: {item['class_1']}")
