# Classification Model eith Random Forest & XGBoost for Predicted Status_Spoiled
### Train and compare RandomForest and XGBoost on dataset_kama.csv
### Features: temperature, humidity, gas_level, jenis_makanan
### Target: status

In [None]:
from pathlib import Path
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, accuracy_score
import joblib

In [None]:

# Try import XGBoost
try:
    from xgboost import XGBClassifier
    has_xgb = True
except Exception:
    has_xgb = False

# locate dataset (robust search)
def find_dataset(name='dataset_kama.csv', max_up=6):
    p = Path.cwd().resolve()
    # check cwd/ai/dataset
    candidate = p / 'ai' / 'dataset' / name
    if candidate.exists():
        return candidate
    # walk up
    for _ in range(max_up):
        p = p.parent
        candidate = p / 'ai' / 'dataset' / name
        if candidate.exists():
            return candidate
    # try repository relative path
    candidate = Path(__file__).resolve().parents[1] / 'ai' / 'dataset' / name if '__file__' in globals() else Path('ai') / 'dataset' / name
    if candidate.exists():
        return candidate
    raise FileNotFoundError(f"Could not find {name}")

try:
    data_path = find_dataset()
except Exception:
    # fallback: try relative path from repo root
    data_path = Path('ai') / 'dataset' / 'dataset_kama.csv'

print('Using dataset:', data_path)
df = pd.read_csv(data_path)



Using dataset: D:\naufalarizq\project\kama-smartbox\ai\dataset\dataset_kama.csv


In [None]:
# normalize column names map
cols_map = {c.lower().strip(): c for c in df.columns}
get_col = lambda name: cols_map.get(name.lower())

feat_names = [get_col('temperature'), get_col('humidity'), get_col('gas_level'), get_col('jenis_makanan')]
feat_names = [f for f in feat_names if f is not None]
if not feat_names:
    raise RuntimeError('No feature columns found in dataset (temperature, humidity, gas_level, jenis_makanan)')

target_col = get_col('status')
if target_col is None:
    raise RuntimeError('Target column "status" not found in dataset')

# Drop rows missing target
df = df.dropna(subset=[target_col])

# Prepare X and y
X = df[feat_names].copy()
y = df[target_col].astype(str).str.strip()

# Encode target
le = LabelEncoder()
y_enc = le.fit_transform(y)
print('Target classes:', le.classes_)

# Train/test split
X_train, X_test, y_train, y_test = train_test_split(X, y_enc, test_size=0.2, random_state=42, stratify=y_enc)
print('Train/Test shapes:', X_train.shape, X_test.shape)

# Build preprocessing
numeric_features = [c for c in feat_names if c.lower() in ('temperature','humidity','gas_level')]
cat_features = [c for c in feat_names if c not in numeric_features]

numeric_transform = Pipeline([
    ('imputer', SimpleImputer(strategy='mean')),
    ('scaler', StandardScaler()),
])
cat_transform = Pipeline([
    ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
    ('onehot', OneHotEncoder(handle_unknown='ignore')),
])
preprocessor = ColumnTransformer([
    ('num', numeric_transform, numeric_features),
    ('cat', cat_transform, cat_features),
], remainder='drop')

models_dir = Path('ai') / 'models'
models_dir.mkdir(parents=True, exist_ok=True)

results = {}



Train/Test shapes: (2626, 4) (657, 4)


In [None]:
# Random Forest
rf_pipeline = Pipeline([
    ('pre', preprocessor),
    ('clf', RandomForestClassifier(n_estimators=200, random_state=42, n_jobs=-1, class_weight='balanced')),
])
print('\nTraining RandomForest...')
rf_pipeline.fit(X_train, y_train)
y_pred_rf = rf_pipeline.predict(X_test)
acc_rf = accuracy_score(y_test, y_pred_rf)
print('RandomForest accuracy:', acc_rf)
print(classification_report(y_test, y_pred_rf, target_names=le.classes_))
results['random_forest'] = {'accuracy': acc_rf}
joblib.dump({'model': rf_pipeline, 'label_encoder': le}, models_dir / 'rf_status_model.pkl')
print('Saved RandomForest to', models_dir / 'rf_status_model.pkl')




Training RandomForest...
RandomForest accuracy: 0.9939117199391172
              precision    recall  f1-score   support

         bad       1.00      1.00      1.00       500
        good       0.96      0.98      0.97        56

    accuracy                           0.99       657
   macro avg       0.98      0.98      0.98       657
weighted avg       0.99      0.99      0.99       657

Saved RandomForest to ai\models\rf_status_model.pkl
RandomForest accuracy: 0.9939117199391172
              precision    recall  f1-score   support

         bad       1.00      1.00      1.00       500
        good       0.96      0.98      0.97        56

    accuracy                           0.99       657
   macro avg       0.98      0.98      0.98       657
weighted avg       0.99      0.99      0.99       657

Saved RandomForest to ai\models\rf_status_model.pkl


In [None]:
# XGBoost (if available)
if has_xgb:
    print('\nTraining XGBoost...')
    xgb_pipeline = Pipeline([
        ('pre', preprocessor),
        ('clf', XGBClassifier(use_label_encoder=False, eval_metric='mlogloss', verbosity=0, random_state=42))
    ])
    xgb_pipeline.fit(X_train, y_train)
    y_pred_xgb = xgb_pipeline.predict(X_test)
    acc_xgb = accuracy_score(y_test, y_pred_xgb)
    print('XGBoost accuracy:', acc_xgb)
    print(classification_report(y_test, y_pred_xgb, target_names=le.classes_))
    results['xgboost'] = {'accuracy': acc_xgb}
    joblib.dump({'model': xgb_pipeline, 'label_encoder': le}, models_dir / 'xgb_status_model.pkl')
    print('Saved XGBoost to', models_dir / 'xgb_status_model.pkl')
else:
    print('\nXGBoost not available in the environment. Install xgboost to run it: pip install xgboost')





Training XGBoost...
XGBoost accuracy: 0.9939117199391172
              precision    recall  f1-score   support

         bad       1.00      1.00      1.00       500
        good       1.00      0.96      0.98        56

    accuracy                           0.99       657
   macro avg       0.99      0.98      0.99       657
weighted avg       0.99      0.99      0.99       657

Saved XGBoost to ai\models\xgb_status_model.pkl
XGBoost accuracy: 0.9939117199391172
              precision    recall  f1-score   support

         bad       1.00      1.00      1.00       500
        good       1.00      0.96      0.98        56

    accuracy                           0.99       657
   macro avg       0.99      0.98      0.99       657
weighted avg       0.99      0.99      0.99       657

Saved XGBoost to ai\models\xgb_status_model.pkl


In [None]:
print('\nSummary:')
for k,v in results.items():
    print(k, v)


Summary:
random_forest {'accuracy': 0.9939117199391172}
xgboost {'accuracy': 0.9939117199391172}


# Predicted Model with Gradient Boosting Regressor / XGBoost Regressor for Predicted Spoiled Date
### Predicted Model with Gradient Boosting Regressor / XGBoost Regressor for Predicted Spoiled Date
### Features: temperature, humidity, gas_level, jenis_makanan
### Target: predicted_spoiled

In [None]:

from pathlib import Path
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import joblib

In [None]:


# Try to import XGBoost regressor
try:
    from xgboost import XGBRegressor
    has_xgb = True
except Exception:
    has_xgb = False

# locate dataset
data_path = Path('ai') / 'dataset' / 'dataset_kama.csv'
if not data_path.exists():
    # try a bit more robust search
    p = Path.cwd().resolve()
    for _ in range(6):
        cand = p / 'ai' / 'dataset' / 'dataset_kama.csv'
        if cand.exists():
            data_path = cand
            break
        if p.parent == p:
            break
        p = p.parent

print('Using dataset:', data_path)
df = pd.read_csv(data_path)


Using dataset: D:\naufalarizq\project\kama-smartbox\ai\dataset\dataset_kama.csv


In [None]:
# normalize column mapping
cols_map = {c.lower().strip(): c for c in df.columns}
get_col = lambda name: cols_map.get(name.lower())

features = [get_col('temperature'), get_col('humidity'), get_col('gas_level'), get_col('jenis_makanan')]
features = [f for f in features if f is not None]
if not features:
    raise RuntimeError('Required feature columns not found')

target = get_col('predicted_spoiled') or get_col('predicted_spoil') or get_col('predicted_spoiled')
if target is None:
    raise RuntimeError('Target column "predicted_spoiled" not found in dataset')

# Drop rows missing target
df = df.dropna(subset=[target])

X = df[features].copy()
y = pd.to_numeric(df[target], errors='coerce')

# Drop rows where target is NaN after conversion
mask_valid = y.notna()
X = X.loc[mask_valid]
y = y.loc[mask_valid]

In [None]:
# Train/test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
print('Train/Test shapes:', X_train.shape, X_test.shape)

# Separate numeric and categorical features
numeric_features = [c for c in features if c.lower() in ('temperature','humidity','gas_level')]
categorical_features = [c for c in features if c not in numeric_features]

numeric_transform = Pipeline([
    ('imputer', SimpleImputer(strategy='mean')),
    ('scaler', StandardScaler()),
])
cat_transform = Pipeline([
    ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
    ('onehot', OneHotEncoder(handle_unknown='ignore')),
])
preprocessor = ColumnTransformer([
    ('num', numeric_transform, numeric_features),
    ('cat', cat_transform, categorical_features),
], remainder='drop')

models_dir = Path('ai') / 'models'
models_dir.mkdir(parents=True, exist_ok=True)

results = {}

Train/Test shapes: (2626, 4) (657, 4)


In [None]:
# Gradient Boosting Regressor
gbr_pipeline = Pipeline([
    ('pre', preprocessor),
    ('reg', GradientBoostingRegressor(random_state=42))
])
print('\nTraining GradientBoostingRegressor...')
gbr_pipeline.fit(X_train, y_train)
y_pred_gbr = gbr_pipeline.predict(X_test)
# compute RMSE in a way compatible with older sklearn versions
rmse_gbr = np.sqrt(mean_squared_error(y_test, y_pred_gbr))
mae_gbr = mean_absolute_error(y_test, y_pred_gbr)
r2_gbr = r2_score(y_test, y_pred_gbr)
print('GBR RMSE:', f"{rmse_gbr:.4f}", 'MAE:', f"{mae_gbr:.4f}", 'R2:', f"{r2_gbr:.4f}")
results['gbr'] = {'rmse': rmse_gbr, 'mae': mae_gbr, 'r2': r2_gbr}
joblib.dump({'model': gbr_pipeline}, models_dir / 'gbr_predicted_spoiled.pkl')
print('Saved GBR to', models_dir / 'gbr_predicted_spoiled.pkl')


Training GradientBoostingRegressor...
GBR RMSE: 0.0311 MAE: 0.0246 R2: 0.9986
Saved GBR to ai\models\gbr_predicted_spoiled.pkl
GBR RMSE: 0.0311 MAE: 0.0246 R2: 0.9986
Saved GBR to ai\models\gbr_predicted_spoiled.pkl


In [None]:
# XGBoost Regressor
if has_xgb:
    print('\nTraining XGBRegressor...')
    xgb_pipeline = Pipeline([
        ('pre', preprocessor),
        ('reg', XGBRegressor(random_state=42, verbosity=0))
    ])
    xgb_pipeline.fit(X_train, y_train)
    y_pred_xgb = xgb_pipeline.predict(X_test)
    rmse_xgb = np.sqrt(mean_squared_error(y_test, y_pred_xgb))
    mae_xgb = mean_absolute_error(y_test, y_pred_xgb)
    r2_xgb = r2_score(y_test, y_pred_xgb)
    print('XGB RMSE:', f"{rmse_xgb:.4f}", 'MAE:', f"{mae_xgb:.4f}", 'R2:', f"{r2_xgb:.4f}")
    results['xgb'] = {'rmse': rmse_xgb, 'mae': mae_xgb, 'r2': r2_xgb}
    joblib.dump({'model': xgb_pipeline}, models_dir / 'xgb_predicted_spoiled.pkl')
    print('Saved XGB to', models_dir / 'xgb_predicted_spoiled.pkl')
else:
    print('\nXGBoost not installed; to run XGBRegressor install xgboost (pip install xgboost)')


Training XGBRegressor...
XGB RMSE: 0.0302 MAE: 0.0238 R2: 0.9986
Saved XGB to ai\models\xgb_predicted_spoiled.pkl
XGB RMSE: 0.0302 MAE: 0.0238 R2: 0.9986
Saved XGB to ai\models\xgb_predicted_spoiled.pkl


In [None]:
# Attach predictions to a sample and display
sample = X_test.copy()
sample['actual'] = y_test
sample['pred_gbr'] = gbr_pipeline.predict(X_test)
if has_xgb:
    sample['pred_xgb'] = xgb_pipeline.predict(X_test)

from IPython.display import display
print('\nSample predictions:')
display(sample.head(10))

print('\nSummary metrics:')
for k,v in results.items():
    print(k, v)

# Optionally, add predictions back to original dataframe and save augmented file
try:
    df_out = df.copy()
    # compute full-dataset predictions where possible
    X_full = df_out[features].copy()
    df_out['pred_gbr'] = gbr_pipeline.predict(X_full)
    if has_xgb:
        df_out['pred_xgb'] = xgb_pipeline.predict(X_full)
    out_path = Path('ai') / 'dataset' / 'dataset_kama_with_reg_preds.csv'
    df_out.to_csv(out_path, index=False)
    print('Saved augmented dataset with regressor predictions to', out_path)
except Exception as e:
    print('Could not save full predictions:', e)


Sample predictions:


Unnamed: 0,temperature,humidity,gas_level,jenis_makanan,actual,pred_gbr,pred_xgb
1657,23.12,72.72,402.42,fruits,-0.31,-0.283222,-0.319031
1298,19.52,79.55,414.84,fruits,-1.06,-1.09806,-1.062061
2416,22.12,68.58,405.28,fruits,-0.24,-0.284994,-0.28974
1650,23.43,51.53,416.55,fruits,0.61,0.602498,0.617029
2552,28.89,86.32,417.74,fruits,-0.36,-0.365749,-0.405895
2913,28.65,92.79,418.06,fruits,-0.68,-0.663274,-0.703079
2664,26.29,88.11,406.98,fruits,-0.67,-0.682336,-0.724387
238,18.56,72.64,409.11,fruits,-0.83,-0.831124,-0.805767
1242,24.25,54.04,417.13,fruits,0.58,0.640874,0.629211
1641,20.98,91.62,409.77,fruits,-1.42,-1.401223,-1.373072



Summary metrics:
gbr {'rmse': np.float64(0.031070900581085727), 'mae': 0.02461581919976353, 'r2': 0.9985618239683981}
xgb {'rmse': np.float64(0.030218331775201328), 'mae': 0.0238092555284863, 'r2': 0.9986396666850849}
Could not save full predictions: Cannot save file into a non-existent directory: 'ai\dataset'


# Classification Models with MLP kecil (1–2 hidden layer, 8–16 neuron) then Convert to .tflite (later xx.c) for Device ESP32
### Classification Models with small MLP (1-2 hidden layers, 8-16 neurons) then Convert to .tflite (for ESP32)
### Features: temperature, humidity, gas_level, jenis_makanan
### Target: status

In [None]:
from pathlib import Path
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
import joblib

In [None]:
# robust dataset lookup
data_path = Path('/content/dataset_kama.csv')
if not data_path.exists():
    p = Path.cwd().resolve()
    for _ in range(6):
        cand = p / 'ai' / 'dataset' / 'dataset_kama.csv'
        if cand.exists():
            data_path = cand
            break
        if p.parent == p:
            break
        p = p.parent

print('Using dataset:', data_path)
df = pd.read_csv(data_path)

Using dataset: /content/dataset_kama.csv


In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# normalize column names map
cols_map = {c.lower().strip(): c for c in df.columns}
get_col = lambda name: cols_map.get(name.lower())

features = [get_col('temperature'), get_col('humidity'), get_col('gas_level'), get_col('jenis_makanan')]
features = [f for f in features if f is not None]
if not features:
    raise RuntimeError('Required feature columns not found')

target_col = get_col('status')
if target_col is None:
    raise RuntimeError('Target column "status" not found')

# Prepare dataset
df = df.dropna(subset=[target_col])
X = df[features].copy()
y = df[target_col].astype(str).str.strip()

# Encode target
le = LabelEncoder()
y_enc = le.fit_transform(y)
print('Target classes:', le.classes_)





In [None]:
# Train/test split
X_train, X_test, y_train, y_test = train_test_split(X, y_enc, test_size=0.2, random_state=42, stratify=y_enc)
print('Train/Test shapes:', X_train.shape, X_test.shape)

# Preprocessing pipelines
numeric_features = [c for c in features if c.lower() in ('temperature','humidity','gas_level')]
cat_features = [c for c in features if c not in numeric_features]

numeric_transform = Pipeline([
    ('imputer', SimpleImputer(strategy='mean')),
    ('scaler', StandardScaler()),
])
cat_transform = Pipeline([
    ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
    ('onehot', OneHotEncoder(handle_unknown='ignore')),
])
preprocessor = ColumnTransformer([
    ('num', numeric_transform, numeric_features),
    ('cat', cat_transform, cat_features),
], remainder='drop')

# Fit preprocessor and transform
X_train_p = preprocessor.fit_transform(X_train)
X_test_p = preprocessor.transform(X_test)

models_dir = Path('ai') / 'models'
models_dir.mkdir(parents=True, exist_ok=True)

# Save preprocessor + label encoder for deployment pipeline
joblib.dump({'preprocessor': preprocessor, 'label_encoder': le}, models_dir / 'mlp_preproc_label.pkl')
print('Saved preprocessor and label encoder to', models_dir / 'mlp_preproc_label.pkl')



Train/Test shapes: (2626, 4) (657, 4)
Saved preprocessor and label encoder to ai/models/mlp_preproc_label.pkl


In [None]:
# Try to use TensorFlow (Keras). If not available, fallback to sklearn MLPClassifier
use_keras = False
try:
    import tensorflow as tf
    from tensorflow import keras
    use_keras = True
    print('TensorFlow detected. Will train Keras MLP and attempt TFLite conversion.')
except Exception as e:
    print('TensorFlow not available, falling back to sklearn MLPClassifier. Error:', e)



TensorFlow detected. Will train Keras MLP and attempt TFLite conversion.


In [None]:
if use_keras:
    # Build small MLP: allow 1-2 hidden layers with 8-16 neurons
    input_dim = X_train_p.shape[1]
    num_classes = len(le.classes_)

    def build_mlp(hidden_layers=(16,)):
        model = keras.Sequential()
        model.add(keras.layers.Input(shape=(input_dim,)))
        for units in hidden_layers:
            model.add(keras.layers.Dense(units, activation='relu'))
        model.add(keras.layers.Dense(num_classes, activation='softmax'))
        model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
        return model

    # two architectures to compare
    archs = {'mlp_16': (16,), 'mlp_16_8': (16,8)}
    trained_models = {}
    for name, arch in archs.items():
        print('\nTraining', name)
        model = build_mlp(arch)
        # small training epochs because dataset may be small
        history = model.fit(X_train_p, y_train, epochs=30, batch_size=32, validation_split=0.1, verbose=0)
        loss, acc = model.evaluate(X_test_p, y_test, verbose=0)
        print(f'{name} test accuracy: {acc:.4f}')
        y_pred = np.argmax(model.predict(X_test_p), axis=1)
        from sklearn.metrics import classification_report
        print(classification_report(y_test, y_pred, target_names=le.classes_))
        trained_models[name] = model
        # save keras model
        model_path = models_dir / f'{name}_status_model.h5'
        model.save(model_path)
        print('Saved Keras model to', model_path)

    # Save best or last model to pkl wrapper: store model path + preprocessor + label encoder
    wrapper = {'models': {n: str(models_dir / f"{n}_status_model.h5") for n in trained_models.keys()}, 'label_encoder': le}
    joblib.dump(wrapper, models_dir / 'mlp_models_wrapper.pkl')
    print('Saved wrapper to', models_dir / 'mlp_models_wrapper.pkl')

    # Convert the last trained model to TFLite with default optimizations (dynamic range)
    try:
        last_model = list(trained_models.values())[-1]
        converter = tf.lite.TFLiteConverter.from_keras_model(last_model)
        converter.optimizations = [tf.lite.Optimize.DEFAULT]
        tflite_model = converter.convert()
        tflite_path = models_dir / 'mlp_status_model.tflite'
        tflite_path.write_bytes(tflite_model)
        print('Saved TFLite model to', tflite_path, 'size (bytes):', tflite_path.stat().st_size)
    except Exception as e:
        print('TFLite conversion failed:', e)

else:
    # Fallback: sklearn MLPClassifier
    from sklearn.neural_network import MLPClassifier
    print('\nTraining sklearn MLPClassifier (fallback)...')
    mlp = MLPClassifier(hidden_layer_sizes=(16,8), max_iter=500, random_state=42)
    mlp.fit(X_train_p, y_train)
    y_pred = mlp.predict(X_test_p)
    from sklearn.metrics import classification_report
    acc = (y_pred == y_test).mean()
    print('sklearn MLP accuracy:', acc)
    print(classification_report(y_test, y_pred, target_names=le.classes_))
    # Save sklearn model and preprocessor+label encoder already saved
    joblib.dump({'model': mlp}, models_dir / 'mlp_sklearn_status_model.pkl')
    print('Saved sklearn MLP to', models_dir / 'mlp_sklearn_status_model.pkl')

print('\nDone. Models and artifacts are under ai/models/.')


Training mlp_16
mlp_16 test accuracy: 0.9452
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step




              precision    recall  f1-score   support

         bad       0.97      0.99      0.98       500
        good       0.91      0.77      0.83        56

    accuracy                           0.95       657
   macro avg       0.90      0.86      0.88       657
weighted avg       0.94      0.95      0.94       657

Saved Keras model to ai/models/mlp_16_status_model.h5

Training mlp_16_8
mlp_16_8 test accuracy: 0.9802
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step




              precision    recall  f1-score   support

         bad       0.99      0.99      0.99       500
        good       1.00      0.95      0.97        56

    accuracy                           0.98       657
   macro avg       0.97      0.96      0.97       657
weighted avg       0.98      0.98      0.98       657

Saved Keras model to ai/models/mlp_16_8_status_model.h5
Saved wrapper to ai/models/mlp_models_wrapper.pkl
Saved artifact at '/tmp/tmpjcmjm04k'. The following endpoints are available:

* Endpoint 'serve'
  args_0 (POSITIONAL_ONLY): TensorSpec(shape=(None, 4), dtype=tf.float32, name='keras_tensor_4')
Output Type:
  TensorSpec(shape=(None, 3), dtype=tf.float32, name=None)
Captures:
  139582057859984: TensorSpec(shape=(), dtype=tf.resource, name=None)
  139582057863440: TensorSpec(shape=(), dtype=tf.resource, name=None)
  139582057857872: TensorSpec(shape=(), dtype=tf.resource, name=None)
  139582057863824: TensorSpec(shape=(), dtype=tf.resource, name=None)
  139582057

In [None]:
# Quantize Keras MLP to INT8 TFLite and generate C array (.cc/.h) for TFLite Micro (ESP32)
# Requires TensorFlow installed. Uses representative dataset from preprocessor + dataset.

from pathlib import Path
import joblib
import numpy as np
import pandas as pd

models_dir = Path('ai') / 'models'
models_dir.mkdir(parents=True, exist_ok=True)

# Locate keras model path from wrapper if available
wrapper_path = models_dir / 'mlp_models_wrapper.pkl'
model_h5_path = None
if wrapper_path.exists():
    try:
        wrapper = joblib.load(wrapper_path)
        # wrapper may contain mapping to model filenames
        if isinstance(wrapper, dict) and 'models' in wrapper:
            # pick the first model
            first = next(iter(wrapper['models'].values()))
            model_h5_path = Path(first)
    except Exception as e:
        print('Could not read wrapper:', e)

# fallback to common filenames
if model_h5_path is None or not model_h5_path.exists():
    candidates = list(models_dir.glob('*mlp*_status_model.h5')) + list(models_dir.glob('*status_model.h5'))
    if candidates:
        model_h5_path = candidates[-1]

if model_h5_path is None or not model_h5_path.exists():
    print('No Keras .h5 model found in', models_dir, '— quantization skipped. Ensure you trained a Keras MLP and saved .h5')
else:
    print('Found Keras model:', model_h5_path)

    # Load preprocessor to prepare representative dataset
    preproc_path = models_dir / 'mlp_preproc_label.pkl'
    if not preproc_path.exists():
        print('Preprocessor file not found at', preproc_path, '— cannot create representative dataset; quantization may still work but calibration will be limited')
        preprocessor = None
    else:
        preproc = joblib.load(preproc_path)
        preprocessor = preproc.get('preprocessor') if isinstance(preproc, dict) and 'preprocessor' in preproc else preproc.get('preprocessor') if hasattr(preproc,'get') else preproc

    # Load dataset for representative samples
    data_path = Path('/content/dataset_kama.csv')
    if not data_path.exists():
        # try a bit more robust search
        p = Path.cwd().resolve()
        for _ in range(6):
            cand = p / 'content' / 'dataset_kama.csv'
            if cand.exists():
                data_path = cand
                break
            if p.parent == p:
                break
            p = p.parent

    df = pd.read_csv(data_path)
    # normalize columns
    cols_map = {c.lower().strip(): c for c in df.columns}
    get_col = lambda name: cols_map.get(name.lower())
    features = [get_col('temperature'), get_col('humidity'), get_col('gas_level'), get_col('jenis_makanan')]
    features = [f for f in features if f is not None]

    # prepare representative dataset generator
    rep_samples = None
    if preprocessor is not None:
        # take up to 100 rows of features without NaN
        Xrep = df[features].dropna().head(200)
        if not Xrep.empty:
            Xrep_p = preprocessor.transform(Xrep)
            # ensure float32
            Xrep_p = Xrep_p.astype(np.float32)
            rep_samples = Xrep_p

    # proceed with TensorFlow conversion
    try:
        import tensorflow as tf
        from tensorflow import keras
    except Exception as e:
        print('TensorFlow not available — cannot convert to TFLite here:', e)
        rep_samples = None

    if 'tf' in globals() and model_h5_path is not None and model_h5_path.exists():
        # load Keras model
        try:
            model = keras.models.load_model(str(model_h5_path))
        except Exception as e:
            print('Failed to load Keras model:', e)
            model = None

        if model is not None:
            converter = tf.lite.TFLiteConverter.from_keras_model(model)
            converter.optimizations = [tf.lite.Optimize.DEFAULT]

            if rep_samples is not None:
                def representative_dataset_gen():
                    for i in range(min(len(rep_samples), 100)):
                        # converter expects a list of input arrays
                        yield [rep_samples[i:i+1]]
                converter.representative_dataset = representative_dataset_gen
                # request int8 ops
                converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
                converter.inference_input_type = tf.int8
                converter.inference_output_type = tf.int8
            else:
                print('No representative samples available — converter will use default quantization (may be float or dynamic range).')

            try:
                tflite_model = converter.convert()
                tflite_path = models_dir / 'mlp_status_model_quant.tflite'
                tflite_path.write_bytes(tflite_model)
                print('Saved quantized TFLite model to', tflite_path, 'size:', tflite_path.stat().st_size)

                # Generate C array (.h/.cc) for TFLite Micro
                name = 'mlp_status_model_quant'
                array_name = name + '_tflite'
                header_path = models_dir / f'{name}.h'
                cc_path = models_dir / f'{name}.cc'

                b = tflite_model
                # write header
                with open(header_path, 'w', encoding='utf-8') as hf:
                    hf.write('#ifndef MODEL_DATA_H\n')
                    hf.write('#define MODEL_DATA_H\n\n')
                    hf.write(f'#include <cstdint>\n\n')
                    hf.write(f'extern const unsigned char {array_name}[];\n')
                    hf.write(f'extern const unsigned int {array_name}_len;\n\n')
                    hf.write('#endif // MODEL_DATA_H\n')

                # write cc with array bytes
                with open(cc_path, 'w', encoding='utf-8') as cf:
                    cf.write('#include "'+header_path.name+'"\n\n')
                    cf.write(f'const unsigned char {array_name}[] = {{\n')
                    # write as hex, 12 bytes per line
                    for i in range(0, len(b), 12):
                        chunk = b[i:i+12]
                        line = ', '.join('0x{:02x}'.format(x) for x in chunk)
                        cf.write('  ' + line + ',\n')
                    cf.write('};\n\n')
                    cf.write(f'const unsigned int {array_name}_len = {len(b)};\n')

                print('Generated C files:', header_path, cc_path)

            except Exception as e:
                print('TFLite conversion failed:', e)
        else:
            print('Model could not be loaded; conversion skipped')
    else:
        print('Skipping TFLite conversion: TensorFlow or model not available.')

print('Quantization cell finished.')



Found Keras model: ai/models/mlp_16_status_model.h5
Saved artifact at '/tmp/tmpmcrbrv1m'. The following endpoints are available:

* Endpoint 'serve'
  args_0 (POSITIONAL_ONLY): TensorSpec(shape=(None, 4), dtype=tf.float32, name='input_layer')
Output Type:
  TensorSpec(shape=(None, 3), dtype=tf.float32, name=None)
Captures:
  139582012058000: TensorSpec(shape=(), dtype=tf.resource, name=None)
  139582012048400: TensorSpec(shape=(), dtype=tf.resource, name=None)
  139582012052240: TensorSpec(shape=(), dtype=tf.resource, name=None)
  139582012056656: TensorSpec(shape=(), dtype=tf.resource, name=None)
Saved quantized TFLite model to ai/models/mlp_status_model_quant.tflite size: 2624
Generated C files: ai/models/mlp_status_model_quant.h ai/models/mlp_status_model_quant.cc
Quantization cell finished.


