<a href="https://colab.research.google.com/github/thuviettran/demo-github1/blob/main/cells.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import tensorflow as tf
from tensorflow.keras import layers
import numpy as np

In [2]:
def RDIM(x, filters, dilation_rate=4):

    shortcut = x
    branch_filters = filters // 2   # IMPORTANT

    # -------- Branch 1 --------
    b1 = layers.Conv2D(branch_filters, 3, padding='same')(x)
    b1 = layers.BatchNormalization()(b1)
    b1 = layers.LeakyReLU()(b1)

    b1 = layers.Conv2D(branch_filters, 3, padding='same')(b1)
    b1 = layers.BatchNormalization()(b1)
    b1 = layers.LeakyReLU()(b1)

    # -------- Branch 2 (dilated) --------
    b2 = layers.Conv2D(branch_filters, 3, padding='same',
                       dilation_rate=dilation_rate)(x)
    b2 = layers.BatchNormalization()(b2)
    b2 = layers.LeakyReLU()(b2)

    b2 = layers.Conv2D(branch_filters, 3, padding='same',
                       dilation_rate=dilation_rate)(b2)
    b2 = layers.BatchNormalization()(b2)
    b2 = layers.LeakyReLU()(b2)

    # -------- Concatenate --------
    merged = layers.Concatenate()([b1, b2])  # Now channels = filters

    # -------- Residual projection --------
    if shortcut.shape[-1] != filters:
        shortcut = layers.Conv2D(filters, 1, padding='same')(shortcut)

    # -------- Residual Add --------
    out = layers.Add()([merged, shortcut])

    out = layers.BatchNormalization()(out)
    out = layers.LeakyReLU()(out)

    return out


In [3]:
def build_pathonet(input_size=(256,256,3), classes=3):

    inputs = tf.keras.Input(shape=input_size)

    # First block
    x1 = layers.Conv2D(16, 3, padding='same')(inputs)
    x1 = layers.BatchNormalization()(x1)
    x1 = layers.LeakyReLU()(x1)

    x1 = layers.Conv2D(16, 3, padding='same')(x1)
    x1 = layers.BatchNormalization()(x1)
    x1 = layers.LeakyReLU()(x1)

    p1 = layers.MaxPooling2D()(x1)

    # Encoder
    x2 = RDIM(p1, 32)
    print("x2:", x2.shape)
    p2 = layers.MaxPooling2D()(x2)

    x3 = RDIM(p2, 64)
    print("x3:", x3.shape)
    p3 = layers.MaxPooling2D()(x3)

    x4 = RDIM(p3, 128)
    print("x4:", x4.shape)
    p4 = layers.MaxPooling2D()(x4)

    b = RDIM(p4, 256)

    # Decoder
    u4 = layers.UpSampling2D()(b)
    u4 = layers.Concatenate()([u4, x4])
    d4 = RDIM(u4, 128)

    u3 = layers.UpSampling2D()(d4)
    u3 = layers.Concatenate()([u3, x3])
    d3 = RDIM(u3, 64)

    u2 = layers.UpSampling2D()(d3)
    u2 = layers.Concatenate()([u2, x2])
    d2 = RDIM(u2, 32)

    u1 = layers.UpSampling2D()(d2)
    u1 = layers.Concatenate()([u1, x1])
    d1 = RDIM(u1, 16)

    outputs = layers.Conv2D(classes, 1, activation='linear')(d1)

    return tf.keras.Model(inputs, outputs)


In [4]:
model = build_pathonet((256,256,3), 3)
model.summary()


x2: (None, 128, 128, 32)
x3: (None, 64, 64, 64)
x4: (None, 32, 32, 128)


In [5]:
for layer in model.layers:
    try:
        print(layer.name, layer.output.shape)
    except:
        print(layer.name)


input_layer (None, 256, 256, 3)
conv2d (None, 256, 256, 16)
batch_normalization (None, 256, 256, 16)
leaky_re_lu (None, 256, 256, 16)
conv2d_1 (None, 256, 256, 16)
batch_normalization_1 (None, 256, 256, 16)
leaky_re_lu_1 (None, 256, 256, 16)
max_pooling2d (None, 128, 128, 16)
conv2d_2 (None, 128, 128, 16)
conv2d_4 (None, 128, 128, 16)
batch_normalization_2 (None, 128, 128, 16)
batch_normalization_4 (None, 128, 128, 16)
leaky_re_lu_2 (None, 128, 128, 16)
leaky_re_lu_4 (None, 128, 128, 16)
conv2d_3 (None, 128, 128, 16)
conv2d_5 (None, 128, 128, 16)
batch_normalization_3 (None, 128, 128, 16)
batch_normalization_5 (None, 128, 128, 16)
leaky_re_lu_3 (None, 128, 128, 16)
leaky_re_lu_5 (None, 128, 128, 16)
concatenate (None, 128, 128, 32)
conv2d_6 (None, 128, 128, 32)
add (None, 128, 128, 32)
batch_normalization_6 (None, 128, 128, 32)
leaky_re_lu_6 (None, 128, 128, 32)
max_pooling2d_1 (None, 64, 64, 32)
conv2d_7 (None, 64, 64, 32)
conv2d_9 (None, 64, 64, 32)
batch_normalization_7 (None, 64, 6

In [None]:
import numpy as np
import cv2
from scipy import ndimage
from skimage.feature import peak_local_max
from skimage.segmentation import watershed
from skimage.measure import label


def watershed_from_prediction(pred, min_distance=5):
    cells = []

    for ch in range(pred.shape[-1]):

        binary = pred[:, :, ch]

        if np.sum(binary) == 0:
            continue

        # Distance transform
        D = ndimage.distance_transform_edt(binary)

        # Local maxima
        coords = peak_local_max(
            D,
            min_distance=min_distance,
            labels=binary
        )

        mask = np.zeros(D.shape, dtype=bool)
        mask[tuple(coords.T)] = True

        markers = label(mask)

        labels_ws = watershed(-D, markers, mask=binary)

        for lbl in np.unique(labels_ws):
            if lbl == 0:
                continue

            region = (labels_ws == lbl).astype("uint8") * 255

            contours, _ = cv2.findContours(
                region,
                cv2.RETR_EXTERNAL,
                cv2.CHAIN_APPROX_SIMPLE
            )

            if len(contours) == 0:
                continue

            c = max(contours, key=cv2.contourArea)
            (x, y), _ = cv2.minEnclosingCircle(c)

            cells.append([x, y, ch])

    return np.array(cells)

In [None]:
def predict_cells(model, img, thresholds=[120,180,40], min_distance=5):

    img_input = img / 255.0
    img_input = np.expand_dims(img_input, 0)

    pred = model.predict(img_input, verbose=0)
    pred = np.squeeze(pred)

    # Scale like original repo
    pred = pred * 255

    # Threshold per channel
    for i in range(3):
        pred[:, :, i][pred[:, :, i] < thresholds[i]] = 0
        pred[:, :, i][pred[:, :, i] > 0] = 255

    cells = watershed_from_prediction(
        pred.astype(np.uint8),
        min_distance=min_distance
    )

    return cells

In [None]:
import tensorflow as tf
print(tf.__version__)


2.19.0


In [None]:
import numpy as np
import json
import tensorflow as tf


In [None]:
import os
import json
import numpy as np
import cv2
import tensorflow as tf


In [None]:
def gaussian_2d(shape, sigma, center):
    h, w = shape
    y, x = np.ogrid[:h, :w]
    cy, cx = center

    g = np.exp(-((x - cx)**2 + (y - cy)**2) / (2 * sigma**2))
    g /= np.sum(g)   # <-- normalize by sum, NOT by 2πσ²

    return g


In [None]:
import json

def json_to_density_map(json_path, img_shape=(256,256), num_classes=3, sigma=3):
    density = np.zeros((*img_shape, num_classes), dtype=np.float32)

    with open(json_path, "r") as f:
        points = json.load(f)

    for p in points:
        x = int(p["x"])
        y = int(p["y"])
        cls = int(p["label_id"]) - 1  # labels: 1,2,3 → channels: 0,1,2

        if 0 <= x < img_shape[1] and 0 <= y < img_shape[0]:
            density[:,:,cls] += gaussian_2d(
                img_shape,
                sigma=sigma,
                center=(y, x)
            )

    return density


In [None]:
import os

def build_file_list(root_dir):
    files = []
    for name in os.listdir(root_dir):
        if name.endswith(".jpg"):
            img_path = os.path.join(root_dir, name)
            json_path = os.path.join(
                root_dir, name.replace(".jpg", ".json")
            )
            if os.path.exists(json_path):
                files.append((img_path, json_path))
            else:
                print("Missing label for:", img_path)
    return files


In [None]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [None]:
def make_dataset(file_list, batch_size=4, shuffle=True):
    img_paths = [f[0] for f in file_list]
    json_paths = [f[1] for f in file_list]

    ds = tf.data.Dataset.from_tensor_slices((img_paths, json_paths))

    if shuffle:
        ds = ds.shuffle(buffer_size=len(file_list), reshuffle_each_iteration=True)

    ds = ds.map(tf_load_sample, num_parallel_calls=tf.data.AUTOTUNE)
    ds = ds.batch(batch_size)
    ds = ds.prefetch(tf.data.AUTOTUNE)

    return ds


In [None]:
train_files = build_file_list("/content/drive/MyDrive/256x256 cropped images/train256")
test_files  = build_file_list("/content/drive/MyDrive/256x256 cropped images/test256")

train_ds = make_dataset(train_files, batch_size=4)
test_ds  = make_dataset(test_files, batch_size=4, shuffle=False)


In [None]:
print("Train samples:", len(train_files))
print("Test samples:", len(test_files))
print(train_files[0])


Train samples: 1656
Test samples: 700
('/content/drive/MyDrive/256x256 cropped images/train256/294_0039_10.jpg', '/content/drive/MyDrive/256x256 cropped images/train256/294_0039_10.json')


In [None]:
import tensorflow as tf
from PIL import Image

IMG_SIZE = (256, 256)
NUM_CLASSES = 3

def load_sample(img_path, json_path):
    # Load image
    img = Image.open(img_path).convert("RGB")
    img = img.resize(IMG_SIZE)
    img = np.array(img, dtype=np.float32) / 255.0

    # Load density map
    density = json_to_density_map(
        json_path,
        img_shape=IMG_SIZE,
        num_classes=NUM_CLASSES,
        sigma=3
    )

    return img, density


In [None]:
def tf_load_sample(img_path, json_path):
    img, dens = tf.numpy_function(
        load_sample,
        [img_path, json_path],
        [tf.float32, tf.float32]
    )

    img.set_shape((256,256,3))
    dens.set_shape((256,256,3))
    return img, dens


In [None]:
BATCH_SIZE = 4

train_ds = make_dataset(train_files, BATCH_SIZE, shuffle=True)
test_ds  = make_dataset(test_files,  BATCH_SIZE, shuffle=False)


In [None]:
for x, y in train_ds.take(1):
    print(x.shape, y.shape)
    print("Counts:", tf.reduce_sum(y, axis=[1,2]))


(4, 256, 256, 3) (4, 256, 256, 3)
Counts: tf.Tensor(
[[26. 87.  2.]
 [17. 34.  0.]
 [42. 85.  0.]
 [18.  0.  0.]], shape=(4, 3), dtype=float32)


In [None]:
callbacks = [
    tf.keras.callbacks.EarlyStopping(
        monitor="val_loss",
        patience=8,
        restore_best_weights=True,
        verbose=1
    ),
    tf.keras.callbacks.ModelCheckpoint(
        filepath="/content/pathonet_best.keras",
        monitor="val_loss",
        save_best_only=True,
        verbose=1
    )
]


In [None]:
model.output_shape

(None, 256, 256, 3)

In [None]:
# model = build_pathonet(input_size=(256,256,3), classes=3)

# model.compile(
#     optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
#     loss="mse"
# )
model = build_pathonet((256,256,3), classes=3)

model.compile(
    optimizer=tf.keras.optimizers.Adam(1e-4),
    loss=weighted_mse
)

history = model.fit(
    train_ds,
    validation_data=test_ds,
    epochs=50,
    callbacks=callbacks
)


NameError: name 'build_pathonet' is not defined

In [None]:
model = tf.keras.models.load_model("/content/pathonet_best.keras")

In [None]:
model.save("/content/drive/MyDrive/pathonet_best.keras")

In [None]:
pred = np.maximum(pred, 0)

In [None]:
import numpy as np
import cv2
from scipy import ndimage as ndi
from skimage.feature import peak_local_max
from skimage.segmentation import watershed

# def watershed_from_density(density_map, threshold=0.1):

#     # Binarize
#     binary = density_map > threshold

#     # Distance transform
#     distance = ndi.distance_transform_edt(binary)

#     # Find peaks
#     coords = peak_local_max(
#         distance,
#         footprint=np.ones((3, 3)),
#         labels=binary
#     )

#     mask = np.zeros(distance.shape, dtype=bool)
#     mask[tuple(coords.T)] = True

#     markers, _ = ndi.label(mask)

#     labels = watershed(-distance, markers, mask=binary)

#     return labels
# def watershed_from_density(density_map, threshold_ratio=0.3):

#     density_map = np.maximum(density_map, 0)

#     threshold = threshold_ratio * density_map.max()

#     binary = density_map > threshold

#     distance = ndi.distance_transform_edt(binary)

#     coords = peak_local_max(
#         distance,
#         footprint=np.ones((3, 3)),
#         labels=binary
#     )

#     mask = np.zeros(distance.shape, dtype=bool)
#     mask[tuple(coords.T)] = True

#     markers, _ = ndi.label(mask)

#     labels = watershed(-distance, markers, mask=binary)

#     return labels
def watershed_from_density(density_map, threshold_ratio=0.45):

    density_map = np.maximum(density_map, 0)

    threshold = threshold_ratio * density_map.max()
    binary = density_map > threshold

    distance = ndi.distance_transform_edt(binary)

    coords = peak_local_max(
        distance,
        min_distance=6,
        labels=binary
    )

    mask = np.zeros(distance.shape, dtype=bool)
    mask[tuple(coords.T)] = True

    markers, _ = ndi.label(mask)

    labels = watershed(-distance, markers, mask=binary)

    return labels

In [None]:
import matplotlib.pyplot as plt
from PIL import Image

img_path = test_files[0][0]

img = Image.open(img_path).convert("RGB")
img = img.resize((256,256))
img = np.array(img) / 255.0
img_input = np.expand_dims(img, axis=0)

pred = model.predict(img_input)[0]

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 53ms/step


In [None]:
labels_pos = watershed_from_density(pred[:,:,0])
labels_neg = watershed_from_density(pred[:,:,1])
labels_til = watershed_from_density(pred[:,:,2])

count_pos = labels_pos.max()
count_neg = labels_neg.max()
count_til = labels_til.max()

print("Predicted counts:")
print("Positive:", count_pos)
print("Negative:", count_neg)
print("TIL:", count_til)

Predicted counts:
Positive: 9
Negative: 5
TIL: 2


In [None]:
import json

with open(test_files[0][1]) as f:
    gt = json.load(f)

gt_counts = [0,0,0]
for p in gt:
    gt_counts[p["label_id"]-1] += 1

print("Ground Truth:", gt_counts)

Ground Truth: [7, 8, 3]


In [None]:
import numpy as np
from tqdm import tqdm

mae = np.zeros(3)
n = len(test_files)

for img_path, json_path in tqdm(test_files):

    # Load image
    img = Image.open(img_path).convert("RGB")
    img = img.resize((256,256))
    img = np.array(img) / 255.0
    img_input = np.expand_dims(img, axis=0)

    # Predict
    pred = model.predict(img_input, verbose=0)[0]
    pred = np.maximum(pred, 0)

    # Watershed
    labels_pos = watershed_from_density(pred[:,:,0], threshold_ratio=0.45)
    labels_neg = watershed_from_density(pred[:,:,1], threshold_ratio=0.45)
    labels_til = watershed_from_density(pred[:,:,2], threshold_ratio=0.45)

    pred_counts = np.array([
        labels_pos.max(),
        labels_neg.max(),
        labels_til.max()
    ])

    # Ground truth
    with open(json_path) as f:
        gt = json.load(f)

    gt_counts = np.zeros(3)
    for p in gt:
        gt_counts[p["label_id"]-1] += 1

    mae += np.abs(pred_counts - gt_counts)

mae /= n

print("MAE per class:", mae)
print("Total MAE:", mae.sum())

100%|██████████| 700/700 [01:24<00:00,  8.24it/s]

MAE per class: [17.11       38.87857143  9.39571429]
Total MAE: 65.38428571428571





In [None]:
print("Min:", pred.min())
print("Max:", pred.max())
print("Mean:", pred.mean())

Min: -0.060303528
Max: 0.055299234
Mean: 0.0002925928


In [None]:
import os
os.path.exists("/content/pathonet_best.keras")


False

In [None]:
model = tf.keras.models.load_model(
    "/content/pathonet_best.keras",
    compile=False
)


In [None]:
model.compile(
    optimizer=tf.keras.optimizers.Adam(1e-4),
    loss="mse"
)


In [None]:
import numpy as np

pred_counts = []
gt_counts = []

for x, y in test_ds:
    preds = model.predict(x, verbose=0)

    pred_counts.append(np.sum(preds, axis=(1,2)))
    gt_counts.append(np.sum(y.numpy(), axis=(1,2)))

pred_counts = np.concatenate(pred_counts, axis=0)
gt_counts   = np.concatenate(gt_counts, axis=0)

mae_per_class = np.mean(np.abs(pred_counts - gt_counts), axis=0)
total_mae = np.mean(np.abs(np.sum(pred_counts, axis=1) -
                           np.sum(gt_counts, axis=1)))

print("MAE per class:", mae_per_class)
print("Total MAE:", total_mae)


MAE per class: [12.094651 26.43226  16.34894 ]
Total MAE: 29.65804


In [None]:
print("Average GT per class:", np.mean(gt_counts, axis=0))

Average GT per class: [22.503742  46.628265   1.9714019]


In [None]:
avg = np.array([22.5, 46.6, 1.97])
weights = 1 / avg
weights = weights / weights.sum()
print(weights)

[0.07749443 0.03741684 0.88508872]


In [None]:
#copy trọng số ô trên để đưa vào tf.constant()

In [None]:
import tensorflow as tf

class_weights = tf.constant([0.07749443, 0.03741684, 0.88508872], dtype=tf.float32)
# replace W1, W2, W3 with your computed weights

def weighted_mse(y_true, y_pred):
    error = tf.square(y_true - y_pred)
    error = error * class_weights
    return tf.reduce_mean(error)

In [None]:
model = build_pathonet((256,256,3), classes=3)

model.compile(
    optimizer=tf.keras.optimizers.Adam(1e-4),
    loss=weighted_mse
)