<a href="https://colab.research.google.com/github/thuviettran/demo-github/blob/main/cells.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!git clone https://github.com/SHIDCenter/PathoNet.git
%cd PathoNet
!ls


Cloning into 'PathoNet'...
remote: Enumerating objects: 272, done.[K
remote: Counting objects: 100% (26/26), done.[K
remote: Compressing objects: 100% (13/13), done.[K
remote: Total 272 (delta 18), reused 13 (delta 13), pack-reused 246 (from 1)[K
Receiving objects: 100% (272/272), 3.18 MiB | 7.59 MiB/s, done.
Resolving deltas: 100% (149/149), done.
/content/PathoNet
config.py  doc		  models.py	    README.md
configs    evaluation.py  pipeline.py	    requirements.txt
data	   LICENSE	  preprocessing.py  train.py
demo.py    logs		  pretrainedmodels  utils.py


In [1]:
import tensorflow as tf
from tensorflow.keras import layers
import numpy as np

In [2]:
def RDIM(x, filters, dilation_rate=2):

    shortcut = x
    branch_filters = filters // 2   # reduce width

    # Branch 1
    b1 = layers.Conv2D(branch_filters, 1, padding='same', use_bias=False)(x)
    b1 = layers.BatchNormalization()(b1)
    b1 = layers.LeakyReLU()(b1)

    # Branch 2
    b2 = layers.Conv2D(branch_filters, 3, padding='same', use_bias=False)(x)
    b2 = layers.BatchNormalization()(b2)
    b2 = layers.LeakyReLU()(b2)

    # Branch 3 (dilated)
    b3 = layers.Conv2D(branch_filters, 3, padding='same',
                       dilation_rate=dilation_rate,
                       use_bias=False)(x)
    b3 = layers.BatchNormalization()(b3)
    b3 = layers.LeakyReLU()(b3)

    # Concatenate (≈ 1.5 * filters)
    x = layers.Concatenate()([b1, b2, b3])

    # Compress back to filters
    x = layers.Conv2D(filters, 1, padding='same', use_bias=False)(x)
    x = layers.BatchNormalization()(x)

    # Residual projection if needed
    if shortcut.shape[-1] != filters:
        shortcut = layers.Conv2D(filters, 1, padding='same', use_bias=False)(shortcut)
        shortcut = layers.BatchNormalization()(shortcut)

    x = layers.Add()([x, shortcut])
    x = layers.LeakyReLU()(x)

    return x

In [3]:
def build_pathonet(input_size=(256,256,3), classes=3):

    inputs = tf.keras.Input(shape=input_size)

    # Encoder
    x1 = RDIM(inputs, 32)
    p1 = layers.MaxPooling2D()(x1)

    x2 = RDIM(p1, 64)
    p2 = layers.MaxPooling2D()(x2)

    x3 = RDIM(p2, 128)
    p3 = layers.MaxPooling2D()(x3)

    x4 = RDIM(p3, 256)
    p4 = layers.MaxPooling2D()(x4)

    # Bottleneck (IMPORTANT: 256, not 512)
    b = RDIM(p4, 256)

    # Decoder
    u4 = layers.UpSampling2D()(b)
    u4 = layers.Concatenate()([u4, x4])
    d4 = RDIM(u4, 256)

    u3 = layers.UpSampling2D()(d4)
    u3 = layers.Concatenate()([u3, x3])
    d3 = RDIM(u3, 128)

    u2 = layers.UpSampling2D()(d3)
    u2 = layers.Concatenate()([u2, x2])
    d2 = RDIM(u2, 64)

    u1 = layers.UpSampling2D()(d2)
    u1 = layers.Concatenate()([u1, x1])
    d1 = RDIM(u1, 32)

    outputs = layers.Conv2D(classes, 1, activation='relu')(d1)

    model = tf.keras.Model(inputs, outputs)
    return model

In [4]:
model = build_pathonet((256,256,3), classes=3)
model.summary()

In [35]:
import numpy as np
import cv2
from scipy import ndimage
from skimage.feature import peak_local_max
from skimage.segmentation import watershed
from skimage.measure import label


# def watershed_from_prediction(pred, min_distance=5):
#     cells = []

#     for ch in range(pred.shape[-1]):

#         binary = pred[:, :, ch]

#         if np.sum(binary) == 0:
#             continue

#         # Distance transform
#         D = ndimage.distance_transform_edt(binary)

#         # Local maxima
#         coords = peak_local_max(
#             D,
#             min_distance=min_distance,
#             labels=binary
#         )

#         mask = np.zeros(D.shape, dtype=bool)
#         mask[tuple(coords.T)] = True

#         markers = label(mask)

#         labels_ws = watershed(-D, markers, mask=binary)

#         for lbl in np.unique(labels_ws):
#             if lbl == 0:
#                 continue

#             region = (labels_ws == lbl).astype("uint8") * 255

#             contours, _ = cv2.findContours(
#                 region,
#                 cv2.RETR_EXTERNAL,
#                 cv2.CHAIN_APPROX_SIMPLE
#             )

#             if len(contours) == 0:
#                 continue

#             c = max(contours, key=cv2.contourArea)
#             (x, y), _ = cv2.minEnclosingCircle(c)

#             cells.append([x, y, ch])

#     return np.array(cells)

def watershed_from_prediction(pred, min_distance=5):

    cells = []

    # If single channel → expand to 3D
    if pred.ndim == 2:
        pred = np.expand_dims(pred, axis=-1)

    for ch in range(pred.shape[-1]):

        binary = pred[:, :, ch]

        if np.sum(binary) == 0:
            continue

        D = ndimage.distance_transform_edt(binary)

        coords = peak_local_max(
            D,
            min_distance=min_distance,
            labels=binary
        )

        mask = np.zeros(D.shape, dtype=bool)
        if len(coords) > 0:
            mask[tuple(coords.T)] = True

        markers = label(mask)
        labels_ws = watershed(-D, markers, mask=binary)

        for lbl in np.unique(labels_ws):
            if lbl == 0:
                continue

            region = (labels_ws == lbl).astype("uint8") * 255

            contours, _ = cv2.findContours(
                region,
                cv2.RETR_EXTERNAL,
                cv2.CHAIN_APPROX_SIMPLE
            )

            if len(contours) == 0:
                continue

            c = max(contours, key=cv2.contourArea)
            (x, y), _ = cv2.minEnclosingCircle(c)

            cells.append([x, y, ch])

    return np.array(cells)


In [36]:
# def predict_cells(model, img, thresholds=[120,180,40], min_distance=5):

#     img_input = img / 255.0
#     img_input = np.expand_dims(img_input, 0)

#     pred = model.predict(img_input, verbose=0)
#     pred = np.squeeze(pred)

#     # Scale like original repo
#     pred = pred * 255

#     # Threshold per channel
#     for i in range(3):
#         pred[:, :, i][pred[:, :, i] < thresholds[i]] = 0
#         pred[:, :, i][pred[:, :, i] > 0] = 255

#     cells = watershed_from_prediction(
#         pred.astype(np.uint8),
#         min_distance=min_distance
#     )

#     return cells
# def predict_cells(model, img, threshold_ratio=0.4, min_distance=5):

#     img_input = img / 255.0
#     img_input = np.expand_dims(img_input, 0)

#     pred = model.predict(img_input, verbose=0)[0]

#     cells = []

#     for ch in range(3):

#         density = pred[:, :, ch]

#         if np.max(density) <= 0:
#             continue

#         threshold = np.max(density) * threshold_ratio
#         binary = (density > threshold).astype(np.uint8)

#         cells_ch = watershed_from_prediction(
#             binary,
#             min_distance=min_distance
#         )

#         if len(cells_ch) > 0:
#             cells.extend(cells_ch)

#     return np.array(cells)

def predict_cells(model, img, threshold_ratio=0.4, min_distance=5):

    img_input = img / 255.0
    img_input = np.expand_dims(img_input, 0)

    pred = model.predict(img_input, verbose=0)[0]  # ← IMPORTANT

    cells = []

    for ch in range(pred.shape[-1]):

        density = pred[:, :, ch]

        if np.max(density) <= 0:
            continue

        threshold = np.max(density) * threshold_ratio
        binary = (density > threshold).astype(np.uint8)

        cells_ch = watershed_from_prediction(binary, min_distance)

        if len(cells_ch) > 0:
            cells.extend(cells_ch)

    return np.array(cells)



In [38]:
# Take one test image
img_path, json_path = test_files[0]

img = cv2.imread(img_path)
img = cv2.resize(img, (256,256))

img_input = img / 255.0
img_input = np.expand_dims(img_input, 0)

pred = model.predict(img_input, verbose=0)

print("Raw prediction shape:", pred.shape)

pred = pred[0]   # remove batch dimension
print("After removing batch:", pred.shape)



Raw prediction shape: (1, 256, 256, 3)
After removing batch: (256, 256, 3)


In [7]:
import tensorflow as tf
print(tf.__version__)


2.19.0


In [8]:
import numpy as np
import json
import tensorflow as tf


In [9]:
import os
import json
import numpy as np
import cv2
import tensorflow as tf


In [10]:
def gaussian_2d(shape, sigma, center):
    h, w = shape
    y, x = np.ogrid[:h, :w]
    cy, cx = center

    g = np.exp(-((x - cx)**2 + (y - cy)**2) / (2 * sigma**2))
    g /= np.sum(g)   # <-- normalize by sum, NOT by 2πσ²

    return g


In [11]:
import json

def json_to_density_map(json_path, img_shape=(256,256), num_classes=3, sigma=3):
    density = np.zeros((*img_shape, num_classes), dtype=np.float32)

    with open(json_path, "r") as f:
        points = json.load(f)

    for p in points:
        x = int(p["x"])
        y = int(p["y"])
        cls = int(p["label_id"]) - 1  # labels: 1,2,3 → channels: 0,1,2

        if 0 <= x < img_shape[1] and 0 <= y < img_shape[0]:
            density[:,:,cls] += gaussian_2d(
                img_shape,
                sigma=sigma,
                center=(y, x)
            )

    return density


In [12]:
import os

def build_file_list(root_dir):
    files = []
    for name in os.listdir(root_dir):
        if name.endswith(".jpg"):
            img_path = os.path.join(root_dir, name)
            json_path = os.path.join(
                root_dir, name.replace(".jpg", ".json")
            )
            if os.path.exists(json_path):
                files.append((img_path, json_path))
            else:
                print("Missing label for:", img_path)
    return files


In [41]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [14]:
def make_dataset(file_list, batch_size=4, shuffle=True):
    img_paths = [f[0] for f in file_list]
    json_paths = [f[1] for f in file_list]

    ds = tf.data.Dataset.from_tensor_slices((img_paths, json_paths))

    if shuffle:
        ds = ds.shuffle(buffer_size=len(file_list), reshuffle_each_iteration=True)

    ds = ds.map(tf_load_sample, num_parallel_calls=tf.data.AUTOTUNE)
    ds = ds.batch(batch_size)
    ds = ds.prefetch(tf.data.AUTOTUNE)

    return ds


In [16]:
train_files = build_file_list("/content/drive/MyDrive/256x256 cropped images/train256")
test_files  = build_file_list("/content/drive/MyDrive/256x256 cropped images/test256")



Missing label for: /content/drive/MyDrive/256x256 cropped images/train256/k-2765_0204_11 (1).jpg


In [25]:
# BATCH_SIZE = 4

# train_ds = make_dataset(train_files, BATCH_SIZE, shuffle=True)
# test_ds  = make_dataset(test_files,  BATCH_SIZE, shuffle=False)
BATCH_SIZE = 4

train_ds = make_dataset(train_files, BATCH_SIZE, shuffle=True)
test_ds  = make_dataset(test_files,  BATCH_SIZE, shuffle=False)

for x, y in train_ds.take(1):
    print(x.shape, y.shape)



(4, 256, 256, 3) (4, 256, 256, 3)


In [21]:
print("Train samples:", len(train_files))
print("Test samples:", len(test_files))
print(train_files[0])


Train samples: 1656
Test samples: 700
('/content/drive/MyDrive/256x256 cropped images/train256/SC11475-17_0284_5.jpg', '/content/drive/MyDrive/256x256 cropped images/train256/SC11475-17_0284_5.json')


In [18]:
import tensorflow as tf
from PIL import Image

IMG_SIZE = (256, 256)
NUM_CLASSES = 3

def load_sample(img_path, json_path):
    # Load image
    img = Image.open(img_path).convert("RGB")
    img = img.resize(IMG_SIZE)
    img = np.array(img, dtype=np.float32) / 255.0

    # Load density map
    density = json_to_density_map(
        json_path,
        img_shape=IMG_SIZE,
        num_classes=NUM_CLASSES,
        sigma=3
    )

    return img, density


In [19]:
def tf_load_sample(img_path, json_path):
    img, dens = tf.numpy_function(
        load_sample,
        [img_path, json_path],
        [tf.float32, tf.float32]
    )

    img.set_shape((256,256,3))
    dens.set_shape((256,256,3))
    return img, dens


In [22]:
for x, y in train_ds.take(1):
    print(x.shape, y.shape)
    print("Counts:", tf.reduce_sum(y, axis=[1,2]))


(4, 256, 256, 3) (4, 256, 256, 3)
Counts: tf.Tensor(
[[ 27.         112.           5.        ]
 [  0.99999994  14.           3.        ]
 [ 21.          17.           0.        ]
 [ 10.          42.           0.        ]], shape=(4, 3), dtype=float32)


In [None]:
# callbacks = [
#     tf.keras.callbacks.EarlyStopping(
#         monitor="val_loss",
#         patience=8,
#         restore_best_weights=True,
#         verbose=1
#     ),
#     tf.keras.callbacks.ModelCheckpoint(
#         filepath="/content/pathonet_best.keras",
#         monitor="val_loss",
#         save_best_only=True,
#         verbose=1
#     )
# ]


In [26]:
# # model = build_pathonet(input_size=(256,256,3), classes=3)
# model = build_pathonet((256,256,3), classes=3)


# model.compile(
#     optimizer=tf.keras.optimizers.Adam(learning_rate=5e-5),
#     loss="mse"
# )

# history = model.fit(
#     train_ds,
#     validation_data=test_ds,
#     epochs=100,
#     callbacks=callbacks
# )
model = build_pathonet((256,256,3), classes=3)

model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=5e-5),
    loss="mse"
)

callbacks = [
    tf.keras.callbacks.EarlyStopping(
        monitor="val_loss",
        patience=10,
        restore_best_weights=True,
        verbose=1
    ),
    tf.keras.callbacks.ModelCheckpoint(
        filepath="/content/drive/MyDrive/experiments/pathonet_best.keras",
        monitor="val_loss",
        save_best_only=True,
        verbose=1
    ),
    tf.keras.callbacks.ReduceLROnPlateau(
        monitor='val_loss',
        factor=0.5,
        patience=4,
        verbose=1
    )
]

history = model.fit(
    train_ds,
    validation_data=test_ds,
    epochs=100,
    callbacks=callbacks
)



Epoch 1/100
[1m414/414[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3s/step - loss: 0.2240
Epoch 1: val_loss improved from inf to 0.01750, saving model to /content/drive/MyDrive/experiments/pathonet_best.keras
[1m414/414[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1175s[0m 3s/step - loss: 0.2237 - val_loss: 0.0175 - learning_rate: 5.0000e-05
Epoch 2/100
[1m414/414[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 275ms/step - loss: 0.0131
Epoch 2: val_loss improved from 0.01750 to 0.00835, saving model to /content/drive/MyDrive/experiments/pathonet_best.keras
[1m414/414[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m165s[0m 399ms/step - loss: 0.0131 - val_loss: 0.0084 - learning_rate: 5.0000e-05
Epoch 3/100
[1m414/414[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 286ms/step - loss: 0.0067
Epoch 3: val_loss improved from 0.00835 to 0.00509, saving model to /content/drive/MyDrive/experiments/pathonet_best.keras
[1m414/414[0m [32m━━━━━━━━━━━━━━━━━━━━[0m

KeyboardInterrupt: 

In [44]:
model = tf.keras.models.load_model("/content/drive/MyDrive/experiments/pathonet_best.keras")

In [28]:
def evaluate_mae(model, dataset):
    total_error = 0
    total_images = 0

    for x, y in dataset:
        pred = model.predict(x, verbose=0)

        gt_counts = tf.reduce_sum(y, axis=[1,2])
        pred_counts = tf.reduce_sum(pred, axis=[1,2])

        error = tf.abs(gt_counts - pred_counts)
        total_error += tf.reduce_sum(error).numpy()
        total_images += x.shape[0]

    return total_error / total_images


In [45]:
mae = evaluate_mae(model, test_ds)
print("Test MAE:", mae)


Test MAE: 55.84832


In [30]:
def compute_f1(pred_cells, gt_cells, dist_thresh=6):

    if len(pred_cells) == 0 and len(gt_cells) == 0:
        return 1.0
    if len(pred_cells) == 0 or len(gt_cells) == 0:
        return 0.0

    from scipy.spatial import distance_matrix

    D = distance_matrix(pred_cells[:, :2], gt_cells[:, :2])

    matched_gt = set()
    TP = 0

    for i in range(len(pred_cells)):
        j = np.argmin(D[i])
        if D[i, j] < dist_thresh and j not in matched_gt:
            TP += 1
            matched_gt.add(j)

    FP = len(pred_cells) - TP
    FN = len(gt_cells) - TP

    precision = TP / (TP + FP + 1e-8)
    recall = TP / (TP + FN + 1e-8)

    return 2 * precision * recall / (precision + recall + 1e-8)


In [31]:
def get_gt_centers(json_path):
    with open(json_path, "r") as f:
        points = json.load(f)

    centers = []
    for p in points:
        x = int(p["x"])
        y = int(p["y"])
        cls = int(p["label_id"]) - 1
        centers.append([x, y, cls])

    return np.array(centers)


In [32]:
from scipy.spatial import distance_matrix

def compute_f1_image(pred_cells, gt_cells, dist_thresh=6):

    if len(pred_cells) == 0 and len(gt_cells) == 0:
        return 1.0
    if len(pred_cells) == 0 or len(gt_cells) == 0:
        return 0.0

    TP = 0
    matched_gt = set()

    D = distance_matrix(pred_cells[:, :2], gt_cells[:, :2])

    for i in range(len(pred_cells)):
        j = np.argmin(D[i])
        if D[i, j] <= dist_thresh and j not in matched_gt:
            TP += 1
            matched_gt.add(j)

    FP = len(pred_cells) - TP
    FN = len(gt_cells) - TP

    precision = TP / (TP + FP + 1e-8)
    recall = TP / (TP + FN + 1e-8)

    f1 = 2 * precision * recall / (precision + recall + 1e-8)

    return f1


In [33]:
def evaluate_f1(model, file_list, threshold_ratio=0.4, min_distance=5):

    f1_scores = []

    for img_path, json_path in file_list:

        # Load image
        img = cv2.imread(img_path)
        img = cv2.resize(img, (256,256))

        # Predict cells
        pred_cells = predict_cells(
            model,
            img,
            threshold_ratio=threshold_ratio,
            min_distance=min_distance
        )

        # Get GT centers
        gt_cells = get_gt_centers(json_path)

        # Compute F1
        f1 = compute_f1_image(pred_cells, gt_cells)

        f1_scores.append(f1)

    return np.mean(f1_scores)


In [42]:
model = tf.keras.models.load_model(
    "/content/drive/MyDrive/experiments/pathonet_best.keras"
)

f1_score = evaluate_f1(
    model,
    test_files,
    threshold_ratio=0.4,
    min_distance=5
)

print("Average F1:", f1_score)


Average F1: 0.10998377207921546


In [46]:
img_path, json_path = train_files[0]

density = json_to_density_map(json_path)

print("GT count (from JSON):", len(get_gt_centers(json_path)))
print("Density sum:", np.sum(density))


GT count (from JSON): 4
Density sum: 4.0


In [48]:
for x, y in test_ds.take(1):
    pred = model.predict(x, verbose=0)
    print("GT counts:", tf.reduce_sum(y, axis=[1,2]).numpy())
    print("Pred counts:", tf.reduce_sum(pred, axis=[1,2]).numpy())


GT counts: [[ 1.         13.          4.        ]
 [ 9.         28.          0.99999994]
 [ 4.         20.          0.        ]
 [ 8.         31.          0.        ]]
Pred counts: [[ 3.3320322  12.085581   11.96955   ]
 [-0.6416991  26.220314    5.829939  ]
 [-6.714486   62.725372   -7.9477057 ]
 [ 0.32256067 11.924948   31.376263  ]]
