<a href="https://colab.research.google.com/github/manushi0304/Diabetic_Retinopathy/blob/main/HybridModelsDenseNet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# ===== Cell 1: Mount Drive & locate DenseNet =====
import os, fnmatch, datetime
from google.colab import drive
import shutil

# Ensure the mountpoint is empty before mounting
mountpoint = '/content/drive'
if os.path.exists(mountpoint):
    print(f"Clearing existing mountpoint: {mountpoint}")
    try:
        # Use shutil.rmtree to remove directories and their contents
        for item in os.listdir(mountpoint):
            item_path = os.path.join(mountpoint, item)
            if os.path.isdir(item_path):
                shutil.rmtree(item_path)
            else:
                os.remove(item_path)
    except Exception as e:
        print(f"Error clearing mountpoint: {e}")


drive.mount(mountpoint, force_remount=True)

BASE   = f"{mountpoint}/MyDrive/DiabeticProject"
SAVED  = f"{BASE}/saved_models"
TFLITE = f"{BASE}/tflite"
HYBRID_SAVE_DIR = f"{BASE}/hybrid_models"
os.makedirs(HYBRID_SAVE_DIR, exist_ok=True)

def find_one(root, patterns=(".keras", ".h5"), name_contains=("densenet", "densenet121")):
    hits = []
    for r, _, files in os.walk(root):
        for f in files:
            if not f.lower().endswith(patterns):
                continue
            if any(k in f.lower() for k in name_contains):
                hits.append(os.path.join(r, f))
    # prefer newer, then shorter path
    hits.sort(key=lambda p:(-os.path.getmtime(p), len(p)))
    return hits[0] if hits else None

def find_tflite(root, name_contains=("densenet", "densenet121"), quant="fp16"):
    hits = []
    for r, _, files in os.walk(root):
        for f in files:
            if not f.lower().endswith(".tflite"):
                continue
            if any(k in f.lower() for k in name_contains) and quant in f.lower():
                hits.append(os.path.join(r, f))
    hits.sort(key=lambda p:(-os.path.getmtime(p), len(p)))
    return hits[0] if hits else None

# Pick DenseNet files
DENSENET_KERAS_PATH     = find_one(SAVED, (".keras", ".h5"), ("densenet", "densenet121"))
DENSENET_TFLITE_FP16    = find_tflite(TFLITE, ("densenet", "densenet121"), "fp16")

print("=== DenseNet paths ===")
print("DENSENET_KERAS_PATH  :", DENSENET_KERAS_PATH,  "exists:", os.path.exists(DENSENET_KERAS_PATH) if DENSENET_KERAS_PATH else None)
print("DENSENET_TFLITE_FP16 :", DENSENET_TFLITE_FP16, "exists:", os.path.exists(DENSENET_TFLITE_FP16) if DENSENET_TFLITE_FP16 else None)
print("HYBRID_SAVE_DIR      :", HYBRID_SAVE_DIR)

if not DENSENET_KERAS_PATH:
    raise SystemExit("‚ùå DenseNet Keras model not found under saved_models. "
                     "Please ensure a file like 'DenseNet121_*.keras' exists.")

Mounted at /content/drive
=== DenseNet paths ===
DENSENET_KERAS_PATH  : /content/drive/MyDrive/DiabeticProject/saved_models/DenseNet121_single_split.keras exists: True
DENSENET_TFLITE_FP16 : /content/drive/MyDrive/DiabeticProject/tflite/DenseNet121_model_fp16.tflite exists: True
HYBRID_SAVE_DIR      : /content/drive/MyDrive/DiabeticProject/hybrid_models


In [None]:
# ===== Cell 2: Locate APTOS Binary via KaggleHub & build dataframe =====
import os, re, zipfile, pandas as pd
from sklearn.model_selection import train_test_split

try:
    _ = subhajeetdas_aptos_2019_jpg_path
except NameError:
    import kagglehub
    subhajeetdas_aptos_2019_jpg_path = kagglehub.dataset_download('subhajeetdas/aptos-2019-jpg')

print("KaggleHub root:", subhajeetdas_aptos_2019_jpg_path)

def _norm(s): return re.sub(r'[_\s]+', ' ', s.strip().lower())
def _has_binary(dirpath):
    try:
        kids = [_norm(d) for d in os.listdir(dirpath) if os.path.isdir(os.path.join(dirpath, d))]
        return ('dr' in kids) and ('no dr' in kids)
    except Exception:
        return False

def _maybe_unzip_here(root):
    for f in os.listdir(root):
        if f.lower().endswith(".zip") and "aptos" in f.lower() and "binary" in f.lower():
            z = os.path.join(root, f)
            out = os.path.join(root, os.path.splitext(f)[0])
            if not os.path.isdir(out):
                print("üì¶ Extracting:", z)
                with zipfile.ZipFile(z, 'r') as zf: zf.extractall(out)

def find_binary_root():
    _maybe_unzip_here(subhajeetdas_aptos_2019_jpg_path)
    cands = [
        os.path.join(subhajeetdas_aptos_2019_jpg_path, "APTOS 2019 (Original) (Binary)"),
        subhajeetdas_aptos_2019_jpg_path,
    ]
    for c in cands:
        if os.path.isdir(c) and _has_binary(c): return c
    for dirpath, _, files in os.walk(subhajeetdas_aptos_2019_jpg_path):
        if _has_binary(dirpath): return dirpath
        if any(f.lower().endswith(".zip") for f in files): _maybe_unzip_here(dirpath)
    # also check Drive copies
    for c in [
        "/content/drive/MyDrive/DiabeticProject/APTOS 2019 (Original) (Binary)",
        "/content/drive/MyDrive/APTOS 2019 (Original) (Binary)"
    ]:
        if os.path.isdir(c) and _has_binary(c): return c
    raise SystemExit("‚ùå Could not locate the APTOS Binary dataset.")

base_path = find_binary_root()
print("‚úÖ Using dataset root:", base_path)
print("Class folders:", os.listdir(base_path))

# Build df
exts = ('.png','.jpg','.jpeg','.bmp','.tif','.tiff','.webp')
rows = []
for cls in os.listdir(base_path):
    d = os.path.join(base_path, cls)
    if not os.path.isdir(d): continue
    lab = 1 if _norm(cls)=='dr' else 0 if _norm(cls)=='no dr' else None
    if lab is None: continue
    for f in os.listdir(d):
        if f.lower().endswith(exts):
            rows.append((os.path.join(d, f), lab))
df = pd.DataFrame(rows, columns=['image_path','label'])

# quick 70/15/15 (we'll redo with dedup & grouped next)
train_df, temp_df = train_test_split(df, test_size=0.30, stratify=df['label'], random_state=42)
valid_df, test_df = train_test_split(temp_df, test_size=0.50, stratify=temp_df['label'], random_state=42)
for name, d in [("Train",train_df),("Valid",valid_df),("Test",test_df)]:
    print(f"{name}: total={len(d)} | No DR={(d['label']==0).sum()} | DR={(d['label']==1).sum()}")


Downloading from https://www.kaggle.com/api/v1/datasets/download/subhajeetdas/aptos-2019-jpg?dataset_version_number=12...


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 2.82G/2.82G [00:34<00:00, 88.3MB/s]

Extracting files...





KaggleHub root: /root/.cache/kagglehub/datasets/subhajeetdas/aptos-2019-jpg/versions/12
‚úÖ Using dataset root: /root/.cache/kagglehub/datasets/subhajeetdas/aptos-2019-jpg/versions/12/APTOS 2019 (Original) (Binary)
Class folders: ['No DR', 'Details.txt', 'DR']
Train: total=2563 | No DR=1263 | DR=1300
Valid: total=549 | No DR=271 | DR=278
Test: total=550 | No DR=271 | DR=279


In [None]:
# ===== Cell 3: Deduplicate + grouped split (no leakage) =====
import os, re, hashlib, numpy as np, pandas as pd
from PIL import Image
from sklearn.model_selection import GroupShuffleSplit

pool = pd.concat([train_df, valid_df, test_df], ignore_index=True).drop_duplicates("image_path")
pool["label"] = pool["label"].astype(int)

def sha1(fp, chunk=1<<20):
    h = hashlib.sha1()
    with open(fp, 'rb') as f:
        while True:
            b = f.read(chunk)
            if not b: break
            h.update(b)
    return h.hexdigest()

def ahash(path, n=8):
    try:
        img = Image.open(path).convert('L').resize((n,n))
    except Exception:
        return None
    arr = np.asarray(img, dtype=np.float32)
    bits = (arr > arr.mean()).astype(np.uint8).ravel()
    out = []
    for i in range(0, bits.size, 8):
        byte = 0
        for j in range(8):
            if i+j < bits.size and bits[i+j]: byte |= (1 << (7-j))
        out.append(byte)
    return tuple(out)

print("üîç SHA1 ‚Ä¶")
pool["sha1"] = pool["image_path"].apply(sha1)
pool = pool.drop_duplicates("sha1").reset_index(drop=True)

print("üîç aHash ‚Ä¶")
pool["ahash"] = pool["image_path"].apply(ahash)
pool = pool[pool["ahash"].notna()].drop_duplicates("ahash").reset_index(drop=True)

def pid_from_path(p):
    stem = os.path.splitext(os.path.basename(p))[0]
    return re.sub(r'_(left|right)$', '', stem, flags=re.IGNORECASE)

pool["group"] = pool["image_path"].apply(pid_from_path)

gss = GroupShuffleSplit(n_splits=1, train_size=0.70, random_state=42)
i_tr, i_tmp = next(gss.split(pool, groups=pool["group"]))
df_tr, df_tmp = pool.iloc[i_tr].reset_index(drop=True), pool.iloc[i_tmp].reset_index(drop=True)

gss2 = GroupShuffleSplit(n_splits=1, train_size=0.50, random_state=42)
i_va, i_te = next(gss2.split(df_tmp, groups=df_tmp["group"]))
df_va, df_te = df_tmp.iloc[i_va].reset_index(drop=True), df_tmp.iloc[i_te].reset_index(drop=True)

def _cnt(tag, d):
    print(f"{tag}: total={len(d)} | No DR={(d['label']==0).sum()} | DR={(d['label']==1).sum()} | groups={d['group'].nunique()}")
print("\n‚úÖ Post-dedup grouped split:")
_cnt("Train", df_tr); _cnt("Valid", df_va); _cnt("Test ", df_te)
print("Group overlaps:",
      len(set(df_tr.group)&set(df_va.group)),
      len(set(df_tr.group)&set(df_te.group)),
      len(set(df_va.group)&set(df_te.group)))

train_df, valid_df, test_df = df_tr.drop(columns=["sha1","ahash"]), df_va.drop(columns=["sha1","ahash"]), df_te.drop(columns=["sha1","ahash"])


üîç SHA1 ‚Ä¶
üîç aHash ‚Ä¶

‚úÖ Post-dedup grouped split:
Train: total=1244 | No DR=522 | DR=722 | groups=1210
Valid: total=262 | No DR=126 | DR=136 | groups=259
Test : total=266 | No DR=115 | DR=151 | groups=260
Group overlaps: 0 0 0


In [None]:
# ===== Cell 4 (TFLite-FIRST): extract features from your FULL fp16 TFLite by tapping pooled tensor =====
import os, glob, numpy as np, tensorflow as tf
from PIL import Image

# EXPECTED GLOBALS: DENSENET_TFLITE_FP16, train_df, valid_df, test_df
assert DENSENET_TFLITE_FP16 and os.path.exists(DENSENET_TFLITE_FP16), \
    f"Full fp16 TFLite not found: {DENSENET_TFLITE_FP16}"

def _load_tflite(path):
    inter = tf.lite.Interpreter(model_path=path, num_threads=os.cpu_count() or 1)
    inter.allocate_tensors()
    return inter

def _prep(img_path, W, H, expect_dtype):
    img = Image.open(img_path).convert("RGB").resize((W, H), Image.BILINEAR)
    x = np.asarray(img, dtype=np.float32) / 255.0
    x = np.expand_dims(x, 0)
    if expect_dtype == np.float16: x = x.astype(np.float16)
    elif expect_dtype == np.float32: x = x.astype(np.float32)
    else: x = x.astype(expect_dtype)
    return x

def _find_feature_index(inter, preferred_dims=(1024, 1280, 1408, 1536)):
    details = inter.get_tensor_details()
    cand = None
    # try names first
    for d in details:
        name = d["name"].decode() if isinstance(d["name"], bytes) else d["name"]
        shp = tuple(int(s) for s in (d.get("shape_signature", d["shape"])))
        if len(shp) in (1,2) and any(k in name.lower() for k in
                                     ("global_average_pool","global_max_pool","avg_pool","max_pool","gap")):
            cand = d
    # then by dims fallback
    if cand is None:
        for d in details:
            shp = tuple(int(s) for s in (d.get("shape_signature", d["shape"])))
            if len(shp) in (1,2) and shp[-1] in preferred_dims:
                cand = d
    if cand is not None:
        print(f"üîé Using feature tensor idx={cand['index']} name={cand.get('name')} shape={cand.get('shape')}")
        return cand["index"]
    print("‚ö†Ô∏è No internal pool tensor matched ‚Äî falling back to model output.")
    return inter.get_output_details()[0]["index"]

def extract_features_from_full_tflite(tflite_path, df):
    inter = _load_tflite(tflite_path)
    inp = inter.get_input_details()[0]
    H, W, C = int(inp["shape"][1]), int(inp["shape"][2]), int(inp["shape"][3])
    feat_idx = _find_feature_index(inter, preferred_dims=(1024,))  # DenseNet121 penultimate = 1024
    X, y = [], df["label"].astype(int).to_numpy()
    for p in df["image_path"]:
        a = _prep(p, W, H, inp["dtype"])
        inter.set_tensor(inp["index"], a)
        inter.invoke()
        f = inter.get_tensor(feat_idx).reshape(-1)
        X.append(f)
    X = np.vstack(X)
    print("üìê Feature matrix:", X.shape, "from", os.path.basename(tflite_path))
    return X, y

print("‚úÖ Using FULL TFLite for feature extraction:", DENSENET_TFLITE_FP16)
Xd_tr, yd_tr = extract_features_from_full_tflite(DENSENET_TFLITE_FP16, train_df)
Xd_va, yd_va = extract_features_from_full_tflite(DENSENET_TFLITE_FP16, valid_df)
Xd_te, yd_te = extract_features_from_full_tflite(DENSENET_TFLITE_FP16, test_df)


‚úÖ Using FULL TFLite for feature extraction: /content/drive/MyDrive/DiabeticProject/tflite/DenseNet121_model_fp16.tflite


    TF 2.20. Please use the LiteRT interpreter from the ai_edge_litert package.
    See the [migration guide](https://ai.google.dev/edge/litert/migration)
    for details.
    


üîé Using feature tensor idx=954 name=functional_1/densenet121_1/max_pool_1/Max shape=[   1 1024]
üìê Feature matrix: (1244, 1024) from DenseNet121_model_fp16.tflite
üîé Using feature tensor idx=954 name=functional_1/densenet121_1/max_pool_1/Max shape=[   1 1024]


    TF 2.20. Please use the LiteRT interpreter from the ai_edge_litert package.
    See the [migration guide](https://ai.google.dev/edge/litert/migration)
    for details.
    


üìê Feature matrix: (262, 1024) from DenseNet121_model_fp16.tflite
üîé Using feature tensor idx=954 name=functional_1/densenet121_1/max_pool_1/Max shape=[   1 1024]


    TF 2.20. Please use the LiteRT interpreter from the ai_edge_litert package.
    See the [migration guide](https://ai.google.dev/edge/litert/migration)
    for details.
    


üìê Feature matrix: (266, 1024) from DenseNet121_model_fp16.tflite


In [None]:
# ===== Cell 5B: All-TFLite hybrid (backbone fp16 .tflite + head .tflite) + 10-image avg latency =====
import os, time, json, numpy as np, pandas as pd
import tensorflow as tf
from PIL import Image

# Reuse from your session:
# - DENSENET_TFLITE_FP16: path to DenseNet121_model_fp16.tflite
# - final_pipe: your chosen sklearn head (SVM_RBF per your log)
# - Xd_tr, yd_tr, Xd_va, yd_va: feature/label splits from Cell 4
# - test_df: has 'image_path' column

assert os.path.exists(DENSENET_TFLITE_FP16), f"Missing backbone TFLite: {DENSENET_TFLITE_FP16}"

TAG = "DenseNet121_fp16_FE"
SAVE_DIR = HYBRID_SAVE_DIR
os.makedirs(SAVE_DIR, exist_ok=True)

# --- 1) Build fused head weights (scaler + PCA + LR) from the chosen sklearn pipe ---
from sklearn.linear_model import LogisticRegression

def _transform(pipe, X):
    Z = X
    if "scaler" in pipe.named_steps:
        Z = pipe.named_steps["scaler"].transform(Z)
    if "pca" in pipe.named_steps:
        Z = pipe.named_steps["pca"].transform(Z)
    return Z

# Distill to logistic regression on the teacher's transformed space
X_train = np.vstack([Xd_tr, Xd_va])
y_train = np.concatenate([yd_tr, yd_va])

try:
    prob = final_pipe.predict_proba(X_train)[:, 1]
    y_hard = (np.clip(prob, 1e-6, 1-1e-6) >= 0.5).astype(int)
    Z_train = _transform(final_pipe, X_train)
    lr = LogisticRegression(max_iter=2000, random_state=42).fit(Z_train, y_hard)
except Exception:
    Z_train = _transform(final_pipe, X_train)
    lr = LogisticRegression(max_iter=2000, random_state=42).fit(Z_train, y_train)

# Fuse scaler + PCA + LR  -> single affine W_fused, b_fused  that consumes raw features (dim = 1024)
scaler = final_pipe.named_steps.get("scaler")
pca    = final_pipe.named_steps.get("pca")

scale = getattr(scaler, "scale_", None) if scaler is not None else None
mean  = getattr(scaler, "mean_",  None) if scaler is not None else None
P     = pca.components_ if pca is not None else None

Wlr = lr.coef_.reshape(-1,1).astype(np.float32)
blr = float(lr.intercept_.ravel()[0])

Wp = (P.T @ Wlr).astype(np.float32) if P is not None else Wlr
Wf = (Wp * (1.0 / scale.reshape(-1,1))).astype(np.float32) if scale is not None else Wp
b_shift = - float((Wf.ravel() * mean).sum()) if mean is not None else 0.0
bf = np.array([blr + b_shift], dtype=np.float32)

D_in = Wf.shape[0]
print(f"Head fused: input dim = {D_in}, weights shape = {Wf.shape}, bias = {bf.shape}")

# --- 2) Export HEAD as pure TFLite (no Keras): tf.Module -> ConcreteFunction -> TFLite ---
head_tfl_path = os.path.join(SAVE_DIR, f"{TAG}_HEAD_fused_fp32.tflite")

class HeadDenseSigmoid(tf.Module):
    def __init__(self, W, b):
        super().__init__()
        # Use tf.Variable so the converter captures constants
        self.W = tf.Variable(W, trainable=False, dtype=tf.float32, name="W")
        self.b = tf.Variable(b, trainable=False, dtype=tf.float32, name="b")

    @tf.function(input_signature=[tf.TensorSpec([1, D_in], tf.float32, name="features")])
    def __call__(self, features):
        logits = tf.linalg.matmul(features, self.W) + self.b  # [1,1]
        prob = tf.math.sigmoid(logits)
        return {"prob": prob}

head_module = HeadDenseSigmoid(Wf, bf)
concrete = head_module.__call__.get_concrete_function()
conv = tf.lite.TFLiteConverter.from_concrete_functions([concrete])
# (We keep head in fp32 to avoid any numerical surprises; it is tiny anyway)
head_tfl_bytes = conv.convert()
with open(head_tfl_path, "wb") as f:
    f.write(head_tfl_bytes)
print("‚úÖ Wrote HEAD TFLite:", head_tfl_path, "|", round(os.path.getsize(head_tfl_path)/(1024*1024), 3), "MB")

# --- 3) Helpers for timing both TFLites end-to-end ---
def _load_interp(path):
    inter = tf.lite.Interpreter(model_path=path, num_threads=os.cpu_count() or 1)
    inter.allocate_tensors()
    return inter

def _prep_img(img_path, W, H, dtype):
    img = Image.open(img_path).convert("RGB").resize((W, H), Image.BILINEAR)
    x = np.asarray(img, dtype=np.float32)/255.0
    x = np.expand_dims(x, 0)
    if dtype == np.float16: x = x.astype(np.float16)
    elif dtype == np.float32: x = x.astype(np.float32)
    else: x = x.astype(dtype)
    return x

# find pooled features index (same logic as Cell 4)
def _find_feature_index(inter):
    details = inter.get_tensor_details()
    sel = None
    for d in details:
        name = d["name"].decode() if isinstance(d["name"], bytes) else d["name"]
        shp = tuple(int(s) for s in (d.get("shape_signature", d["shape"])))
        if len(shp) in (1,2) and any(k in name.lower() for k in ("global_average_pool","max_pool","avg_pool","gap")):
            sel = d
    if sel is None:
        for d in details:
            shp = tuple(int(s) for s in (d.get("shape_signature", d["shape"])))
            if len(shp) in (1,2) and shp[-1] == D_in:
                sel = d
                break
    return sel["index"] if sel is not None else inter.get_output_details()[0]["index"]

# --- 4) Build 10-image sample from your dataset (test_df) ---
sample_paths = list(test_df["image_path"])[:10] if len(test_df) >= 10 else list(test_df["image_path"])
assert len(sample_paths) > 0, "No images found in test_df['image_path'] to time."

# --- 5) End-to-end timing (backbone fp16 TFLite -> features -> head TFLite) ---
backbone = _load_interp(DENSENET_TFLITE_FP16)
b_in = backbone.get_input_details()[0]
b_H, b_W = int(b_in["shape"][1]), int(b_in["shape"][2])
feat_idx = _find_feature_index(backbone)

head = _load_interp(head_tfl_path)
h_in = head.get_input_details()[0]

# warm-ups
for _ in range(3):
    a = _prep_img(sample_paths[0], b_W, b_H, b_in["dtype"])
    backbone.set_tensor(b_in["index"], a); backbone.invoke()
    f = backbone.get_tensor(feat_idx).astype(np.float32).reshape(1, D_in)
    head.set_tensor(h_in["index"], f); head.invoke()

times = []
for p in sample_paths:
    a = _prep_img(p, b_W, b_H, b_in["dtype"])
    t0 = time.perf_counter()
    backbone.set_tensor(b_in["index"], a); backbone.invoke()
    f = backbone.get_tensor(feat_idx).astype(np.float32).reshape(1, D_in)
    head.set_tensor(h_in["index"], f); head.invoke()
    _ = head.get_output_details()[0]
    t1 = time.perf_counter()
    times.append((t1 - t0) * 1000.0)

t = np.array(times, np.float64)
lat = {
    "N": len(sample_paths),
    "avg_ms": round(t.mean(), 3),
    "std_ms": round(t.std(ddof=1), 3) if len(t) > 1 else 0.0,
    "min_ms": round(t.min(), 3),
    "max_ms": round(t.max(), 3),
}
df_latency = pd.DataFrame([{**{"BackboneTFLite": DENSENET_TFLITE_FP16, "HeadTFLite": head_tfl_path}, **lat}])
display(df_latency)

# Save timing + a tiny manifest
lat_csv = os.path.join(SAVE_DIR, f"{TAG}_ALL_TFLite_hybrid_latency.csv")
df_latency.to_csv(lat_csv, index=False)

with open(os.path.join(SAVE_DIR, f"{TAG}_ALL_TFLite_hybrid_manifest.json"), "w") as f:
    json.dump({
        "backbone_tflite": DENSENET_TFLITE_FP16,
        "head_tflite": head_tfl_path,
        "feature_dim": int(D_in),
        "images_timed": len(sample_paths),
        "latency_ms": lat
    }, f, indent=2)

print("\n‚úÖ Saved:")
print(" -", head_tfl_path)
print(" -", lat_csv)




Head fused: input dim = 1024, weights shape = (1024, 1), bias = (1,)
‚úÖ Wrote HEAD TFLite: /content/drive/MyDrive/DiabeticProject/hybrid_models/DenseNet121_fp16_FE_HEAD_fused_fp32.tflite | 0.005 MB


    TF 2.20. Please use the LiteRT interpreter from the ai_edge_litert package.
    See the [migration guide](https://ai.google.dev/edge/litert/migration)
    for details.
    


Unnamed: 0,BackboneTFLite,HeadTFLite,N,avg_ms,std_ms,min_ms,max_ms
0,/content/drive/MyDrive/DiabeticProject/tflite/...,/content/drive/MyDrive/DiabeticProject/hybrid_...,10,335.908,194.006,139.245,708.032



‚úÖ Saved:
 - /content/drive/MyDrive/DiabeticProject/hybrid_models/DenseNet121_fp16_FE_HEAD_fused_fp32.tflite
 - /content/drive/MyDrive/DiabeticProject/hybrid_models/DenseNet121_fp16_FE_ALL_TFLite_hybrid_latency.csv


In [None]:
# ===== Cell 5C: Export THREE head TFLites (from KNN/SVM/RF) + optional timing =====
import os, time, json, numpy as np, pandas as pd, tensorflow as tf
from sklearn.linear_model import LogisticRegression
from PIL import Image

TAG = "DenseNet121_fp16_FE"
SAVE_DIR = HYBRID_SAVE_DIR
os.makedirs(SAVE_DIR, exist_ok=True)

# Reuse trained heads from Cell 5:
#   heads = {"KNN": knn, "SVM_RBF": svm, "RandomForest": rf}
# Reuse splits & backbone:
#   Xd_tr, yd_tr, Xd_va, yd_va, test_df, DENSENET_TFLITE_FP16

def _transform(pipe, X):
    Z = X
    if "scaler" in pipe.named_steps: Z = pipe.named_steps["scaler"].transform(Z)
    if "pca"    in pipe.named_steps: Z = pipe.named_steps["pca"].transform(Z)
    return Z

def fuse_head(pipe, X_train, y_train):
    # Distill to LR on teacher's transformed space
    Z_train = _transform(pipe, X_train)
    lr = LogisticRegression(max_iter=2000, random_state=42)
    try:
        prob = pipe.predict_proba(X_train)[:, 1]
        y_hard = (np.clip(prob, 1e-6, 1-1e-6) >= 0.5).astype(int)
        lr.fit(Z_train, y_hard)
    except Exception:
        lr.fit(Z_train, y_train)

    scaler = pipe.named_steps.get("scaler")
    pca    = pipe.named_steps.get("pca")
    scale  = getattr(scaler, "scale_", None) if scaler is not None else None
    mean   = getattr(scaler, "mean_",  None) if scaler is not None else None
    P      = pca.components_ if pca is not None else None

    Wlr = lr.coef_.reshape(-1,1).astype(np.float32)
    blr = float(lr.intercept_.ravel()[0])
    Wp  = (P.T @ Wlr).astype(np.float32) if P is not None else Wlr
    Wf  = (Wp * (1.0/scale.reshape(-1,1))).astype(np.float32) if scale is not None else Wp
    b_shift = - float((Wf.ravel() * mean).sum()) if mean is not None else 0.0
    bf = np.array([blr + b_shift], dtype=np.float32)
    return Wf, bf

def export_head_tflite(Wf, bf, out_path):
    D_in = Wf.shape[0]

    class HeadDenseSigmoid(tf.Module):
        def __init__(self, W, b):
            super().__init__()
            self.W = tf.Variable(W, trainable=False, dtype=tf.float32, name="W")
            self.b = tf.Variable(b, trainable=False, dtype=tf.float32, name="b")
        @tf.function(input_signature=[tf.TensorSpec([1, D_in], tf.float32, name="features")])
        def __call__(self, features):
            logits = tf.linalg.matmul(features, self.W) + self.b
            prob = tf.math.sigmoid(logits)
            return {"prob": prob}

    module = HeadDenseSigmoid(Wf, bf)
    concrete = module.__call__.get_concrete_function()
    conv = tf.lite.TFLiteConverter.from_concrete_functions([concrete], trackable_obj=module)
    tbytes = conv.convert()
    with open(out_path, "wb") as f: f.write(tbytes)
    print("üíæ Head TFLite:", out_path, "|", round(os.path.getsize(out_path)/(1024*1024),3), "MB")
    return out_path

# Prepare train data to fit LR students
X_train = np.vstack([Xd_tr, Xd_va])
y_train = np.concatenate([yd_tr, yd_va])

head_paths = {}
for name, pipe in heads.items():
    Wf, bf = fuse_head(pipe, X_train, y_train)
    out_path = os.path.join(SAVE_DIR, f"{TAG}_HEAD_from_{name}_fused_fp32.tflite")
    head_paths[name] = export_head_tflite(Wf, bf, out_path)

# (Optional) quick end-to-end timing for each head on 10 test images
def _prep_img(p, W, H, dtype):
    img = Image.open(p).convert("RGB").resize((W, H), Image.BILINEAR)
    x = np.asarray(img, dtype=np.float32)/255.0
    x = np.expand_dims(x, 0)
    if dtype == np.float16: x = x.astype(np.float16)
    elif dtype == np.float32: x = x.astype(np.float32)
    else: x = x.astype(dtype)
    return x

def _find_feat_idx(inter, d_in_guess):
    details = inter.get_tensor_details()
    sel = None
    for d in details:
        name = d["name"].decode() if isinstance(d["name"], bytes) else d["name"]
        shp = tuple(int(s) for s in (d.get("shape_signature", d["shape"])))
        if len(shp) in (1,2) and any(k in name.lower() for k in ("global_average_pool","max_pool","avg_pool","gap")):
            sel = d
    if sel is None:
        for d in details:
            shp = tuple(int(s) for s in (d.get("shape_signature", d["shape"])))
            if len(shp) in (1,2) and shp[-1] == d_in_guess:
                sel = d; break
    return sel["index"] if sel is not None else inter.get_output_details()[0]["index"]

def time_two_tflites(backbone_path, head_path, img_paths, warmups=3):
    b = tf.lite.Interpreter(model_path=backbone_path); b.allocate_tensors()
    bi = b.get_input_details()[0]; H,W = int(bi["shape"][1]), int(bi["shape"][2])
    feat_idx = _find_feat_idx(b, d_in_guess=1024)
    h = tf.lite.Interpreter(model_path=head_path); h.allocate_tensors()
    hi = h.get_input_details()[0]
    for _ in range(warmups):
        a = _prep_img(img_paths[0], W, H, bi["dtype"])
        b.set_tensor(bi["index"], a); b.invoke()
        f = b.get_tensor(feat_idx).astype(np.float32).reshape(1, -1)
        h.set_tensor(hi["index"], f); h.invoke()
    times=[]
    for p in img_paths:
        a = _prep_img(p, W, H, bi["dtype"])
        t0 = time.perf_counter()
        b.set_tensor(bi["index"], a); b.invoke()
        f = b.get_tensor(feat_idx).astype(np.float32).reshape(1, -1)
        h.set_tensor(hi["index"], f); h.invoke()
        _ = h.get_output_details()[0]
        t1 = time.perf_counter()
        times.append((t1-t0)*1000.0)
    t = np.array(times, np.float64)
    return dict(N=len(img_paths), avg_ms=round(t.mean(),3),
                std_ms=round(t.std(ddof=1),3) if len(t)>1 else 0.0,
                min_ms=round(t.min(),3), max_ms=round(t.max(),3))

# sample 10 test images
sample_paths = list(test_df["image_path"])[:10] if len(test_df) >= 10 else list(test_df["image_path"])
timing_rows = []
for name, hpath in head_paths.items():
    metrics = time_two_tflites(DENSENET_TFLITE_FP16, hpath, sample_paths, warmups=3)
    timing_rows.append({"Head": name, "BackboneTFLite": DENSENET_TFLITE_FP16, "HeadTFLite": hpath, **metrics})

df_timing = pd.DataFrame(timing_rows)
display(df_timing)
csv_out = os.path.join(SAVE_DIR, f"{TAG}_ALL_heads_latency.csv")
df_timing.to_csv(csv_out, index=False)
print("\n‚úÖ Saved head files & timing CSV in:", SAVE_DIR)
for name, p in head_paths.items():
    print(f" - {name}: {p}")
print(" - timing:", csv_out)


üíæ Head TFLite: /content/drive/MyDrive/DiabeticProject/hybrid_models/DenseNet121_fp16_FE_HEAD_from_KNN_fused_fp32.tflite | 0.005 MB
üíæ Head TFLite: /content/drive/MyDrive/DiabeticProject/hybrid_models/DenseNet121_fp16_FE_HEAD_from_SVM_RBF_fused_fp32.tflite | 0.005 MB
üíæ Head TFLite: /content/drive/MyDrive/DiabeticProject/hybrid_models/DenseNet121_fp16_FE_HEAD_from_RandomForest_fused_fp32.tflite | 0.005 MB


    TF 2.20. Please use the LiteRT interpreter from the ai_edge_litert package.
    See the [migration guide](https://ai.google.dev/edge/litert/migration)
    for details.
    


Unnamed: 0,Head,BackboneTFLite,HeadTFLite,N,avg_ms,std_ms,min_ms,max_ms
0,KNN,/content/drive/MyDrive/DiabeticProject/tflite/...,/content/drive/MyDrive/DiabeticProject/hybrid_...,10,171.907,138.048,93.949,482.208
1,SVM_RBF,/content/drive/MyDrive/DiabeticProject/tflite/...,/content/drive/MyDrive/DiabeticProject/hybrid_...,10,100.009,6.553,94.999,117.646
2,RandomForest,/content/drive/MyDrive/DiabeticProject/tflite/...,/content/drive/MyDrive/DiabeticProject/hybrid_...,10,102.287,10.931,88.736,128.863



‚úÖ Saved head files & timing CSV in: /content/drive/MyDrive/DiabeticProject/hybrid_models
 - KNN: /content/drive/MyDrive/DiabeticProject/hybrid_models/DenseNet121_fp16_FE_HEAD_from_KNN_fused_fp32.tflite
 - SVM_RBF: /content/drive/MyDrive/DiabeticProject/hybrid_models/DenseNet121_fp16_FE_HEAD_from_SVM_RBF_fused_fp32.tflite
 - RandomForest: /content/drive/MyDrive/DiabeticProject/hybrid_models/DenseNet121_fp16_FE_HEAD_from_RandomForest_fused_fp32.tflite
 - timing: /content/drive/MyDrive/DiabeticProject/hybrid_models/DenseNet121_fp16_FE_ALL_heads_latency.csv
