In [1]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [6]:
# === EdgeMeter V2 → Docker-safe TFLite exporter (TF 2.19 / Keras 3) ===
# Outputs per model: fp32, fp16, int8_dynamic, int8_full (float32 I/O), BUILTINS-only.
# LSTM is rebuilt as static+unrolled (batch=1) with dropouts=0.0 to avoid TensorList/Flex.
# Manifests are saved under: /content/drive/MyDrive/EdgeMeter_AIv2/data/tflite/{MODEL}/manifest.json

import os, json, glob, io, contextlib
import numpy as np
import tensorflow as tf
from tensorflow import keras

# ----------------- Config -----------------
V2_DIR   = "/content/drive/MyDrive/EdgeMeter_AIv2/data"
X_TEST   = os.path.join(V2_DIR, "X_test_48_12.npy")
OUT_ROOT = os.path.join(V2_DIR, "tflite")
MODELS   = ["LSTM", "TinyFormer", "LiPFormer", "LiteFormer"]

os.makedirs(OUT_ROOT, exist_ok=True)
print("TF:", tf.__version__)

# CPU-only for deterministic conversion
try:
    tf.config.experimental.set_visible_devices([], 'GPU')
    print("GPU hidden (CPU-only).")
except Exception as e:
    print("Note:", e)

# ----------------- Helpers -----------------
def xfeat_dim(x_path: str) -> int:
    x = np.load(x_path, mmap_mode="r")
    if x.ndim != 3 or x.shape[1] != 48:
        raise ValueError(f"Expected (N,48,F) at {x_path}, got {x.shape}")
    return int(x.shape[2])

def find_sources(model_name: str):
    base = f"Final_{model_name}_Model_48_12"
    return {
        "savedmodel": os.path.join(V2_DIR, base) if os.path.isdir(os.path.join(V2_DIR, base)) else "",
        "keras":      os.path.join(V2_DIR, base + ".keras") if os.path.isfile(os.path.join(V2_DIR, base + ".keras")) else "",
        "h5":         os.path.join(V2_DIR, base + ".h5") if os.path.isfile(os.path.join(V2_DIR, base + ".h5")) else "",
        "weights":    next(iter(glob.glob(os.path.join(V2_DIR, "**", "*weights*.h5"), recursive=True)), "")
    }

# --- Robust analyzer: capture stdout OR scan binary; never returns None ---
def analyze_tflite(path: str) -> dict:
    rep_text = ""
    try:
        buf = io.StringIO()
        with contextlib.redirect_stdout(buf):
            _ = tf.lite.experimental.Analyzer.analyze(model_path=path)
        rep_text = buf.getvalue() or ""
    except Exception as e:
        rep_text = f"[analyzer exception] {e}"

    # Fallback: scan flatbuffer for tell-tale strings
    try:
        if not rep_text.strip():
            with open(path, "rb") as fh:
                blob = fh.read()
            rep_text = blob.decode("latin-1", errors="ignore")
    except Exception as e:
        rep_text += f"\n[binary scan skipped: {e}]"

    flags = ["Flex", "SELECT_TF_OPS", "Unsupported ops", "Unsupported operation"]
    has_flex = any(flag in rep_text for flag in flags)
    return {"has_flex": has_flex, "report": rep_text}

def tflite_write(b: bytes, out_path: str) -> float:
    os.makedirs(os.path.dirname(out_path), exist_ok=True)
    with open(out_path, "wb") as f: f.write(b)
    sz_mb = round(os.path.getsize(out_path) / (1024*1024), 4)
    print(f"  wrote {out_path}  ({sz_mb} MB)")
    return sz_mb

def representative_dataset(x: np.ndarray, n=256):
    n = min(n, x.shape[0])
    for i in range(n):
        yield [x[i:i+1].astype(np.float32)]

def mk_converter_from_savedmodel(path: str):
    conv = tf.lite.TFLiteConverter.from_saved_model(path)
    try: conv._experimental_lower_tensor_list_ops = True
    except: pass
    try: conv.experimental_enable_resource_variables = True
    except: pass
    conv.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS]
    conv.allow_custom_ops = False
    return conv

def mk_converter_from_keras(km: keras.Model):
    conv = tf.lite.TFLiteConverter.from_keras_model(km)
    try: conv._experimental_lower_tensor_list_ops = True
    except: pass
    try: conv.experimental_enable_resource_variables = True
    except: pass
    conv.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS]
    conv.allow_custom_ops = False
    return conv

# -------- LSTM export-friendly rebuild (static, unrolled) --------
def rebuild_lstm_static_unrolled(units: int, feat_dim: int) -> keras.Model:
    # Static batch=1, unroll=True, dropouts=0.0, implementation=1
    m = keras.Sequential([
        keras.layers.Input(batch_shape=(1, 48, feat_dim)),
        keras.layers.LSTM(
            units,
            activation="tanh",
            recurrent_activation="sigmoid",
            dropout=0.0,
            recurrent_dropout=0.0,
            unroll=True,
            implementation=1,
            stateful=False,
            return_sequences=False
        ),
        keras.layers.Dense(12)
    ], name="LSTM_export_static_unrolled")
    return m

def try_load_keras_model(path: str):
    if not path: return None
    return keras.models.load_model(path, compile=False, safe_mode=False)

# -------- Variant conversion wrappers --------
def convert_variants_from_savedmodel(src_sm_dir: str, out_dir: str, label: str):
    x = np.load(X_TEST, mmap_mode="r")
    manifest = {"model": label, "artifacts": [], "notes": []}

    # fp32
    c = mk_converter_from_savedmodel(src_sm_dir)
    b = c.convert()
    p = os.path.join(out_dir, f"{label}_fp32.tflite"); s = tflite_write(b, p)
    a = analyze_tflite(p); manifest["artifacts"].append({"variant":"fp32","path":p,"size_mb":s,"has_flex":a["has_flex"]})

    # fp16
    c = mk_converter_from_savedmodel(src_sm_dir); c.optimizations=[tf.lite.Optimize.DEFAULT]; c.target_spec.supported_types=[tf.float16]
    b = c.convert()
    p = os.path.join(out_dir, f"{label}_fp16.tflite"); s = tflite_write(b, p)
    a = analyze_tflite(p); manifest["artifacts"].append({"variant":"fp16","path":p,"size_mb":s,"has_flex":a["has_flex"]})

    # int8 dynamic
    c = mk_converter_from_savedmodel(src_sm_dir); c.optimizations=[tf.lite.Optimize.DEFAULT]
    b = c.convert()
    p = os.path.join(out_dir, f"{label}_int8_dynamic.tflite"); s = tflite_write(b, p)
    a = analyze_tflite(p); manifest["artifacts"].append({"variant":"int8_dynamic","path":p,"size_mb":s,"has_flex":a["has_flex"]})

    # int8 full (float32 IO)
    c = mk_converter_from_savedmodel(src_sm_dir)
    c.optimizations=[tf.lite.Optimize.DEFAULT]
    c.representative_dataset = lambda: representative_dataset(x, 256)
    c.inference_input_type  = tf.float32
    c.inference_output_type = tf.float32
    b = c.convert()
    p = os.path.join(out_dir, f"{label}_int8_full.tflite"); s = tflite_write(b, p)
    a = analyze_tflite(p); manifest["artifacts"].append({"variant":"int8_full","path":p,"size_mb":s,"has_flex":a["has_flex"]})

    offenders = [a["variant"] for a in manifest["artifacts"] if a["has_flex"]]
    manifest["notes"].append("All variants BUILTINS-only (Docker-safe)." if not offenders
                             else f"WARNING: Flex required for {offenders} → not Docker-safe.")
    with open(os.path.join(out_dir, "manifest.json"), "w") as f: json.dump(manifest, f, indent=2)
    print(json.dumps(manifest, indent=2))
    return manifest

def convert_variants_from_keras(km: keras.Model, out_dir: str, label: str):
    x = np.load(X_TEST, mmap_mode="r")
    manifest = {"model": label, "artifacts": [], "notes": []}

    # fp32
    c = mk_converter_from_keras(km); b = c.convert()
    p = os.path.join(out_dir, f"{label}_fp32.tflite"); s = tflite_write(b, p)
    a = analyze_tflite(p); manifest["artifacts"].append({"variant":"fp32","path":p,"size_mb":s,"has_flex":a["has_flex"]})

    # fp16
    c = mk_converter_from_keras(km); c.optimizations=[tf.lite.Optimize.DEFAULT]; c.target_spec.supported_types=[tf.float16]
    b = c.convert()
    p = os.path.join(out_dir, f"{label}_fp16.tflite"); s = tflite_write(b, p)
    a = analyze_tflite(p); manifest["artifacts"].append({"variant":"fp16","path":p,"size_mb":s,"has_flex":a["has_flex"]})

    # int8 dynamic
    c = mk_converter_from_keras(km); c.optimizations=[tf.lite.Optimize.DEFAULT]
    b = c.convert()
    p = os.path.join(out_dir, f"{label}_int8_dynamic.tflite"); s = tflite_write(b, p)
    a = analyze_tflite(p); manifest["artifacts"].append({"variant":"int8_dynamic","path":p,"size_mb":s,"has_flex":a["has_flex"]})

    # int8 full (float32 IO)
    c = mk_converter_from_keras(km); c.optimizations=[tf.lite.Optimize.DEFAULT]
    c.representative_dataset = lambda: representative_dataset(x, 256)
    c.inference_input_type  = tf.float32
    c.inference_output_type = tf.float32
    b = c.convert()
    p = os.path.join(out_dir, f"{label}_int8_full.tflite"); s = tflite_write(b, p)
    a = analyze_tflite(p); manifest["artifacts"].append({"variant":"int8_full","path":p,"size_mb":s,"has_flex":a["has_flex"]})

    offenders = [a["variant"] for a in manifest["artifacts"] if a["has_flex"]]
    manifest["notes"].append("All variants BUILTINS-only (Docker-safe)." if not offenders
                             else f"WARNING: Flex required for {offenders} → not Docker-safe.")
    with open(os.path.join(out_dir, "manifest.json"), "w") as f: json.dump(manifest, f, indent=2)
    print(json.dumps(manifest, indent=2))
    return manifest

# ----------------- Per-model conversion -----------------
def convert_model(model_name: str):
    print(f"\n=== Converting: {model_name} ===")
    src = find_sources(model_name)
    print("Sources:", src)
    out_dir = os.path.join(OUT_ROOT, model_name)
    os.makedirs(out_dir, exist_ok=True)

    if model_name == "LSTM":
        # Prefer .keras / .h5 to rebuild static+unrolled graph and transfer weights.
        km_base = None
        if src["keras"]:
            km_base = try_load_keras_model(src["keras"])
        elif src["h5"]:
            km_base = try_load_keras_model(src["h5"])

        if km_base is not None:
            feat_dim = xfeat_dim(X_TEST)
            lstm_layers = [l for l in km_base.layers if isinstance(l, keras.layers.LSTM)]
            if not lstm_layers:
                raise RuntimeError("No LSTM layer found in LSTM model file.")
            units = lstm_layers[0].units

            clean = rebuild_lstm_static_unrolled(units, feat_dim)
            # Weight transfer: assumes architecture Input->LSTM->(optional Dropout)->Dense(12)
            clean.set_weights(km_base.get_weights())

            manifest = convert_variants_from_keras(clean, out_dir, label=model_name)

        elif src["savedmodel"]:
            # Try direct from SavedModel; if Flex appears, abort with instructions.
            manifest = convert_variants_from_savedmodel(src["savedmodel"], out_dir, label=model_name)
            offenders = [a["variant"] for a in manifest["artifacts"] if a["has_flex"]]
            if offenders:
                raise RuntimeError(
                    "LSTM SavedModel conversion requires Flex (likely due to dropout/recurrent_dropout>0). "
                    "Provide a .keras/.h5 (or weights) so we can rebuild an export-friendly LSTM "
                    "(dropouts=0.0, static batch=1, unrolled) and reconvert."
                )
        else:
            raise FileNotFoundError("No LSTM artifacts found (.keras/.h5/SavedModel).")

    else:
        # Transformers: prefer SavedModel, else .keras/.h5
        if src["savedmodel"]:
            manifest = convert_variants_from_savedmodel(src["savedmodel"], out_dir, label=model_name)
        elif src["keras"] or src["h5"]:
            km = try_load_keras_model(src["keras"] or src["h5"])
            manifest = convert_variants_from_keras(km, out_dir, label=model_name)
        else:
            raise FileNotFoundError(f"No artifacts found for {model_name}.")

        offenders = [a["variant"] for a in manifest["artifacts"] if a["has_flex"]]
        if offenders:
            raise RuntimeError(
                f"{model_name} conversion introduced Flex in {offenders}. "
                "Check for unsupported activations/layers and keep to TFLite builtins."
            )

# ----------------- Main -----------------
def main():
    _ = xfeat_dim(X_TEST)  # sanity check
    for m in MODELS:
        convert_model(m)
    print("\nAll requested models converted (Flex-free) or raised an actionable error.")

if __name__ == "__main__":
    main()


TF: 2.19.0
GPU hidden (CPU-only).

=== Converting: LSTM ===
Sources: {'savedmodel': '/content/drive/MyDrive/EdgeMeter_AIv2/data/Final_LSTM_Model_48_12', 'keras': '/content/drive/MyDrive/EdgeMeter_AIv2/data/Final_LSTM_Model_48_12.keras', 'h5': '/content/drive/MyDrive/EdgeMeter_AIv2/data/Final_LSTM_Model_48_12.h5', 'weights': '/content/drive/MyDrive/EdgeMeter_AIv2/data/lstm_48_12_trained.weights.h5'}
Saved artifact at '/tmp/tmpp8jhcdt0'. The following endpoints are available:

* Endpoint 'serve'
  args_0 (POSITIONAL_ONLY): TensorSpec(shape=(1, 48, 11), dtype=tf.float32, name='keras_tensor_24')
Output Type:
  TensorSpec(shape=(1, 12), dtype=tf.float32, name=None)
Captures:
  135312549787984: TensorSpec(shape=(), dtype=tf.resource, name=None)
  135312549788752: TensorSpec(shape=(), dtype=tf.resource, name=None)
  135312549786832: TensorSpec(shape=(), dtype=tf.resource, name=None)
  135312549789328: TensorSpec(shape=(), dtype=tf.resource, name=None)
  135312549786064: TensorSpec(shape=(), d



  wrote /content/drive/MyDrive/EdgeMeter_AIv2/data/tflite/LSTM/LSTM_int8_full.tflite  (0.6962 MB)
{
  "model": "LSTM",
  "artifacts": [
    {
      "variant": "fp32",
      "path": "/content/drive/MyDrive/EdgeMeter_AIv2/data/tflite/LSTM/LSTM_fp32.tflite",
      "size_mb": 0.4558,
      "has_flex": false
    },
    {
      "variant": "fp16",
      "path": "/content/drive/MyDrive/EdgeMeter_AIv2/data/tflite/LSTM/LSTM_fp16.tflite",
      "size_mb": 0.3176,
      "has_flex": false
    },
    {
      "variant": "int8_dynamic",
      "path": "/content/drive/MyDrive/EdgeMeter_AIv2/data/tflite/LSTM/LSTM_int8_dynamic.tflite",
      "size_mb": 0.2613,
      "has_flex": false
    },
    {
      "variant": "int8_full",
      "path": "/content/drive/MyDrive/EdgeMeter_AIv2/data/tflite/LSTM/LSTM_int8_full.tflite",
      "size_mb": 0.6962,
      "has_flex": false
    }
  ],
  "notes": [
    "All variants BUILTINS-only (Docker-safe)."
  ]
}

=== Converting: TinyFormer ===
Sources: {'savedmodel': '/conte