In [None]:
!pip install sklearn
!pip install seaborn

In [32]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.applications import MobileNetV3Large
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Input
from tensorflow.keras.models import Model
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
from PIL import Image
import pandas as pd
from datetime import datetime
%load_ext tensorboard

The tensorboard extension is already loaded. To reload it, use:
  %reload_ext tensorboard


In [None]:
import tensorflow as tf
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

In [41]:
import os
import numpy as np
import pandas as pd
from PIL import Image

def process_all_datasets(base_path, snr_min, snr_max, window_size, sequence_length):
    X_sequences = []
    y_sequences = []
    for label_name in [chr(i) for i in range(ord("A"), ord("C")+1)]:
        label_path = os.path.join(base_path, label_name)
        if not os.path.exists(label_path):
            print(f"[WARNING] Label {label_name} not found, skipping.")
            continue

        for person_name in os.listdir(label_path):
            person_path = os.path.join(label_path, person_name)
            if not os.path.isdir(person_path):
                continue

            for root, _, files in os.walk(person_path):
                for dataset_file in files:
                    if not dataset_file.endswith(".csv"):
                        continue
                    dataset_path = os.path.join(root, dataset_file)
                    try:
                        df = pd.read_csv(dataset_path)
                        required = {"timestamp", "doppler", "SNR", "x", "y", "z"}
                        if not required.issubset(df.columns):
                            raise ValueError(f"{dataset_path} missing {required}")

                        df["SNR"] = np.clip(df["SNR"], snr_min, snr_max)
                        df["SNR"] = np.log1p(df["SNR"])
                        unique_timestamps = np.sort(df["timestamp"].unique())

                        heatmaps = []

                        for window_idx in range(len(unique_timestamps) - window_size + 1):
                            t_subset = unique_timestamps[window_idx:window_idx+window_size]
                            df_subset = df[df["timestamp"].isin(t_subset)]
                            if df_subset.empty:
                                continue

                            num_bins = 100
                            x_bins = np.linspace(df_subset["x"].min(), df_subset["x"].max(), num_bins)
                            y_bins = np.linspace(df_subset["y"].min(), df_subset["y"].max(), num_bins)
                            z_bins = np.linspace(df_subset["z"].min(), df_subset["z"].max(), num_bins)
                            dop_bins = np.linspace(df_subset["doppler"].min(), df_subset["doppler"].max(), num_bins)

                            def resize(img):
                                img_pil = Image.fromarray(img.astype(np.uint8))
                                img_resized = img_pil.resize((64, 64), Image.LANCZOS)
                                return np.array(img_resized)

                            def make_heatmap(x, y, bx, by):
                                h, _, _ = np.histogram2d(x, y, bins=[bx, by], weights=df_subset["SNR"])
                                hc, _, _ = np.histogram2d(x, y, bins=[bx, by])
                                hc[hc == 0] = 1
                                h /= hc
                                return h.T

                            dr = resize(make_heatmap(df_subset["doppler"], df_subset["x"], dop_bins, x_bins))
                            dt = resize(make_heatmap(df_subset["doppler"], df_subset["y"], dop_bins, y_bins))
                            dz = resize(make_heatmap(df_subset["doppler"], df_subset["z"], dop_bins, z_bins))

                            heatmap_rgb = np.stack([dr, dt, dz], axis=-1)
                            heatmap_rgb = (heatmap_rgb - heatmap_rgb.min()) / (heatmap_rgb.max() - 1e-8)
                            heatmaps.append(heatmap_rgb)

                        # Split the full heatmap list into sequences for LSTM
                        for i in range(0, len(heatmaps) - sequence_length + 1):
                            seq = heatmaps[i:i+sequence_length]
                            X_sequences.append(seq)
                            y_sequences.append(ord(label_name) - ord('A'))

                    except Exception as e:
                        print(f"[ERROR] {dataset_path}: {e}")

    return np.array(X_sequences), np.array(y_sequences)

In [54]:
import os
import numpy as np
import pandas as pd
from PIL import Image

# --- SETTINGS ---
SNR_MIN = 4
SNR_MAX = 843
WINDOW_SIZE = 5         # frames per heatmap
SEQUENCE_LENGTH = 30    # heatmaps per sequence
OUTPUT_DIR = "processed_dataset"

LABELS = [chr(ord("A") + i) for i in range(2)]

def resize(img):
    img_pil = Image.fromarray(img.astype(np.uint8))
    return np.array(img_pil.resize((64, 64), Image.LANCZOS))

def save_sequence(X_seq, y_seq, count):
    np.save(f"{OUTPUT_DIR}/X_seq_{count}.npy", X_seq.astype(np.float32))
    np.save(f"{OUTPUT_DIR}/y_seq_{count}.npy", np.array(y_seq))

def process_all(base_path):
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    count = 0

    for label_name in LABELS:
        label_path = os.path.join(base_path, label_name)
        if not os.path.exists(label_path): continue

        for person_name in os.listdir(label_path):
            person_path = os.path.join(label_path, person_name)
            if not os.path.isdir(person_path): continue

            for root, _, files in os.walk(person_path):
                for f in files:
                    if not f.endswith(".csv"): continue
                    df = pd.read_csv(os.path.join(root, f))

                    if not {"timestamp", "doppler", "x", "y", "z", "SNR"}.issubset(df.columns):
                        continue

                    df["SNR"] = np.clip(df["SNR"], SNR_MIN, SNR_MAX)
                    df["SNR"] = np.log1p(df["SNR"])

                    timestamps = np.sort(df["timestamp"].unique())

                    heatmaps = []
                    for i in range(len(timestamps) - WINDOW_SIZE + 1):
                        t_win = timestamps[i:i+WINDOW_SIZE]
                        df_win = df[df["timestamp"].isin(t_win)]

                        def make_map(x, y):
                            h, _, _ = np.histogram2d(x, y, bins=50, weights=df_win["SNR"])
                            c, _, _ = np.histogram2d(x, y, bins=50)
                            c[c == 0] = 1
                            h /= c
                            return resize(h.T)

                        dr = make_map(df_win["doppler"], df_win["x"])
                        dt = make_map(df_win["doppler"], df_win["y"])
                        dz = make_map(df_win["doppler"], df_win["z"])

                        rgb = np.stack([dr, dt, dz], axis=-1)
                        rgb = (rgb - rgb.min()) / (rgb.max() - rgb.min() + 1e-8)

                        heatmaps.append(rgb)

                        if len(heatmaps) == SEQUENCE_LENGTH:
                            X_seq = np.stack(heatmaps, axis=0)
                            label_idx = LABELS.index(label_name)
                            y_seq = [label_idx] * SEQUENCE_LENGTH

                            save_sequence(X_seq, y_seq, count)
                            count += 1
                            heatmaps = []

    print(f"[DONE] Total saved: {count} sequences")

if __name__ == "__main__":
    process_all("dataset 2")

[DONE] Total saved: 60 sequences


In [55]:
import numpy as np
from tensorflow.keras.utils import Sequence

class RadarSequence(Sequence):
    def __init__(self, X_data, y_data, batch_size=4, shuffle=True):
        """
        A robust Keras Sequence for radar heatmap + LSTM training.
        
        Args:
            X_data (np.array): shape (num_samples, seq_len, 64, 64, 3)
            y_data (np.array): shape (num_samples,)
            batch_size (int): batch size
            shuffle (bool): whether to shuffle indexes each epoch
        """
        self.X = X_data
        self.y = y_data
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.indexes = np.arange(len(self.X))
        self.on_epoch_end()

    def __len__(self):
        """
        Number of batches per epoch
        """
        return int(np.ceil(len(self.X) / self.batch_size))

    def __getitem__(self, idx):
        """
        Generate one batch
        """
        batch_indexes = self.indexes[idx * self.batch_size : (idx + 1) * self.batch_size]
        X_batch = self.X[batch_indexes]
        y_batch = self.y[batch_indexes]
        return X_batch, y_batch

    def on_epoch_end(self):
        """
        Shuffle indexes after each epoch
        """
        if self.shuffle:
            np.random.shuffle(self.indexes)

In [56]:
from sklearn.model_selection import train_test_split
import glob

# 1️⃣ Get all sequence file names
X_files = sorted(glob.glob(f"{OUTPUT_DIR}/X_seq_*.npy"))
y_files = sorted(glob.glob(f"{OUTPUT_DIR}/y_seq_*.npy"))

print(f"Total sequences found: {len(X_files)}")

# 2️⃣ Split file paths (not data yet!)
X_train_files, X_test_files, y_train_files, y_test_files = train_test_split(
    X_files, y_files, test_size=0.2, random_state=42, shuffle=True
)

print(f"Train sequences: {len(X_train_files)}, Test sequences: {len(X_test_files)}")

Total sequences found: 60
Train sequences: 48, Test sequences: 12


In [60]:
X_train, X_test, y_train, y_test = train_test_split(X_files, y_files, test_size=0.2, random_state=42)
# Tampilkan hasil
print("Total dataset setelah diproses:", len(X_files))
print("Train Files:", len(X_train))
print("Test Files:", len(X_test))
# Cek label unik sebelum dan setelah split
print("Label unik sebelum split:", np.unique(y_files))
print("Label unik di y_train:", np.unique(y_train))
print("Label unik di y_test:", np.unique(y_test))

Total dataset setelah diproses: 60
Train Files: 48
Test Files: 12
Label unik sebelum split: ['processed_dataset\\y_seq_0.npy' 'processed_dataset\\y_seq_1.npy'
 'processed_dataset\\y_seq_10.npy' 'processed_dataset\\y_seq_11.npy'
 'processed_dataset\\y_seq_12.npy' 'processed_dataset\\y_seq_13.npy'
 'processed_dataset\\y_seq_14.npy' 'processed_dataset\\y_seq_15.npy'
 'processed_dataset\\y_seq_16.npy' 'processed_dataset\\y_seq_17.npy'
 'processed_dataset\\y_seq_18.npy' 'processed_dataset\\y_seq_19.npy'
 'processed_dataset\\y_seq_2.npy' 'processed_dataset\\y_seq_20.npy'
 'processed_dataset\\y_seq_21.npy' 'processed_dataset\\y_seq_22.npy'
 'processed_dataset\\y_seq_23.npy' 'processed_dataset\\y_seq_24.npy'
 'processed_dataset\\y_seq_25.npy' 'processed_dataset\\y_seq_26.npy'
 'processed_dataset\\y_seq_27.npy' 'processed_dataset\\y_seq_28.npy'
 'processed_dataset\\y_seq_29.npy' 'processed_dataset\\y_seq_3.npy'
 'processed_dataset\\y_seq_30.npy' 'processed_dataset\\y_seq_31.npy'
 'processed_dat

NameError: name 'X_all' is not defined

In [None]:
!pip install keras-tuner

In [61]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, LSTM, TimeDistributed, Dropout, BatchNormalization
#from radar_loader import RadarSequence

input_shape = (30, 64, 64, 3)  # SEQUENCE, H, W, C

model = Sequential([
    TimeDistributed(Conv2D(32, (3,3), activation='relu', padding='same'), input_shape=input_shape),
    TimeDistributed(MaxPooling2D(2,2)),
    TimeDistributed(Conv2D(64, (3,3), activation='relu', padding='same')),
    TimeDistributed(MaxPooling2D(2,2)),
    TimeDistributed(Flatten()),
    LSTM(128, return_sequences=False),
    Dropout(0.4),
    Dense(3, activation='softmax')
])



model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
model.summary()



In [62]:
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np

# 👇 Your DataSequence class must be defined before this, e.g.:
# train_seq = RadarSequence(X_train_files, y_train_files, batch_size=2)
# test_seq = RadarSequence(X_test_files, y_test_files, batch_size=2)

batch_size = 4

train_seq = RadarSequence(X_train, y_train, batch_size=batch_size)
test_seq  = RadarSequence(X_test, y_test, batch_size=batch_size, shuffle=False)
# ✅ Early Stopping
early_stop = EarlyStopping(
    monitor='val_loss',
    patience=5,
    restore_best_weights=True,
    verbose=1
)

# ✅ Fit using the generator — do NOT pass (X_train, y_train) directly!
history = model.fit(
    train_seq,
    validation_data=test_seq,
    epochs=10,
    callbacks=[early_stop],
    verbose=1
)

# ✅ Predict all batches in test_seq
y_true = []
y_pred = []

# --- Predict in ONE shot ---
y_pred_prob = model.predict(test_seq)  # shape: (num_samples, num_classes)

y_pred = np.argmax(y_pred_prob, axis=1)

# --- True labels ---
y_true = []
for _, y_batch in test_seq:
    y_true.extend(y_batch)

y_true = np.array(y_true)

print(classification_report(y_true, y_pred))

cm = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(12, 4))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=["A", "B", "C"],
            yticklabels=["A", "B", "C"])
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.title('Confusion Matrix')
plt.show()

# ✅ Train vs Validation Plots
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('Train vs Validation Loss')

plt.subplot(1, 2, 2)
plt.plot(history.history['accuracy'], label='Train Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.title('Train vs Validation Accuracy')

plt.show()

  self._warn_if_super_not_called()


TypeError: only integer scalar arrays can be converted to a scalar index

In [None]:
early_stop = EarlyStopping(
    monitor='val_loss',
    patience=5,
    restore_best_weights=True,
    verbose=1
)

#!rm -rf ./logs/

logdir="logs/fit/" + datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=logdir, histogram_freq=1, write_graph=True)


# Training model
history = tuner.search(
    X_train, y_train,
    batch_size=batch_size,
    epochs=epochs,
    class_weight=class_weights,
    validation_data=(X_test, y_test),
    callbacks=[early_stop,tensorboard_callback],
    verbose=1
)



In [None]:
best_trial = tuner.oracle.get_best_trials(num_trials = 1)[0]

print(best_trial.metrics)

In [None]:
!pip install visualkeras 
!pip install --upgrade visualkeras

In [None]:

bestmodel = tuner.get_best_models(1)[0]
bestmodel.save("a-z(30).h5")


In [None]:
best_model = tuner.get_best_models(1)[0]



y_pred = best_model.predict(X_test) 
y_pred_classes = y_pred.argmax(axis=1)  
print(classification_report(y_test, y_pred_classes))
cm = confusion_matrix(y_test, y_pred_classes)

plt.figure(figsize=(12, 10))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['A', 'B', 'C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T','U','V','W','X','Y','Z'],
            yticklabels=['A', 'B', 'C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T','U','V','W','X','Y','Z'])
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.title('Confusion Matrix')
plt.show()

best_hp = tuner.get_best_hyperparameters(1)[0]
best_model = tuner.hypermodel.build(best_hp)

history = best_model.fit(
    X_train, y_train,
    batch_size=batch_size,
    epochs=epochs,
    class_weight=class_weights,
    validation_data=(X_test, y_test),
    callbacks=[early_stop,tensorboard_callback],
    verbose=1
)

plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('Train vs Validation Loss')
plt.subplot(1, 2, 2)
plt.plot(history.history['accuracy'], label='Train Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.title('Train vs Validation Accuracy')

plt.show()

In [None]:
best_mod = tuner.get_best_models(1)[0]

best_mod.save("random.h5")




In [None]:
import visualkeras
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from tensorflow.keras.optimizers import Adam


# Assuming your model is built using `tuner.get_best_models(1)[0]`
model = tuner.get_best_models(1)[0]



visualkeras.layered_view(model, legend=True, show_dimension=True)

In [None]:
model.save("coba_coba2.h5")