In [None]:
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, Model, Input, optimizers, regularizers
from tensorflow.keras.applications import MobileNetV2
from sklearn.metrics import mean_absolute_error, r2_score

# Load dataset and relevant columns
df = pd.read_csv("../data/nutrition_filtered.csv", usecols=[0, 1, 7],
                 header=0, names=["dish_id", "calories", "text"])
df["image_path"] = df["dish_id"].apply(lambda x: f"../data/images/{x}.png")

# File containing test data
with open(f"../data/fold_1.txt") as f:
    test_ids = set(line.strip() for line in f)
train_ids = set(df["dish_id"]) - test_ids

train_df = df[df["dish_id"].isin(train_ids)].copy()
test_df  = df[df["dish_id"].isin(test_ids)].copy() 

text_vectorizer = tf.keras.layers.TextVectorization(output_mode='int', output_sequence_length=20)
text_vectorizer.adapt(train_df["text"])

IMG_SIZE = (224, 224)
def preprocess(inputs, label):
    path, text = inputs
    img = tf.io.read_file(path)
    img = tf.image.decode_png(img, channels=3)
    img = tf.image.random_flip_left_right(img)
    img = tf.image.random_brightness(img, 0.1)
    img = tf.image.random_contrast(img, 0.9, 1.1)
    img = tf.image.resize(img, IMG_SIZE)
    img = img / 255.0
    text = text_vectorizer(text)
    return (img, text), label

def make_dataset(df, batch_size=16, shuffle=True):
    paths = df["image_path"].values
    texts = df["text"].values
    labels = df["calories"].values.astype("float32")
    ds = tf.data.Dataset.from_tensor_slices(((paths, texts), labels))
    ds = ds.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)
    if shuffle:
        ds = ds.shuffle(buffer_size=len(df))
    return ds.batch(batch_size).prefetch(tf.data.AUTOTUNE)

train_ds = make_dataset(train_df, shuffle=True)
val_ds   = make_dataset(test_df,  shuffle=False)  # keep order!

# Defining model variables
vocab_size = len(text_vectorizer.get_vocabulary())
emb_dim = 128

# Image branch
img_input = Input(shape=(224, 224, 3))
base_model = MobileNetV2(include_top=False, weights="imagenet", pooling="avg")
base_model.trainable = False
img_features = base_model(img_input)

# Text branch
text_input = Input(shape=(20,), dtype='int32')
text_emb = layers.Embedding(vocab_size, emb_dim)(text_input)
text_features = layers.GlobalAveragePooling1D()(text_emb)

# Cross-attention (image attends to text)
proj_img  = layers.Dense(256)(img_features)
proj_text = layers.Dense(256)(text_features)
proj_img  = layers.Lambda(lambda x: tf.expand_dims(x, axis=1))(proj_img)
proj_text = layers.Lambda(lambda x: tf.expand_dims(x, axis=1))(proj_text)
cross_att = layers.MultiHeadAttention(num_heads=4, key_dim=64)(
    query=proj_img, value=proj_text, key=proj_text)

fused = layers.Flatten()(cross_att)
combined = layers.Concatenate()([fused, img_features, text_features])
x = layers.Dense(128, activation='relu')(combined)
x = layers.Dropout(0.3)(x)
x = layers.Dense(128, activation='relu', kernel_regularizer=regularizers.l2(1e-4))(x)
x = layers.Dropout(0.3)(x)
out = layers.Dense(1)(x)

model = Model(inputs=[img_input, text_input], outputs=out)
model.compile(optimizer=optimizers.Adam(), loss='mse', metrics=['mae'])

# Train
model.fit(train_ds, epochs=10, verbose=0)

# Test stage
preds = model.predict(val_ds, verbose=0).flatten()
y_true = test_df["calories"].values.astype(float)
dish_ids = test_df["dish_id"].values

# Metrics
mae = mean_absolute_error(y_true, preds)
r2  = r2_score(y_true, preds)

# Per-dish CSV export
out_df = pd.DataFrame({
    "dish_id": dish_ids,
    "y_true": y_true,
    "y_pred": preds,
    "abs_error": np.abs(y_true - preds)
})
out_df.to_csv(f"../data/multimodal_errors.csv", index=False)

# Summary
print("\n=== Results ===")
print(f"MAE = {mae:.2f} kcal | R² = {r2:.4f}")