In [10]:
# ============================================================
# Multi-Frame AMC + Curriculum Learning (FINAL - ONE CELL)
# ============================================================

!pip install numpy tensorflow scikit-learn

import os, pickle
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
from sklearn.preprocessing import LabelBinarizer
from sklearn.model_selection import train_test_split



In [11]:
# ----------------------------
# Reproducibility
# ----------------------------
np.random.seed(42)
tf.random.set_seed(42)

In [12]:
# ----------------------------
# Parameters
# ----------------------------
K = 5                 # number of frames
BATCH_SIZE = 256

In [13]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [14]:
print("TF:", tf.__version__)
print("GPU:", tf.config.list_physical_devices('GPU'))

TF: 2.19.0
GPU: [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


In [17]:
import zipfile
import os, pickle
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
from sklearn.preprocessing import LabelBinarizer
from sklearn.model_selection import train_test_split

zip_path = '/content/drive/MyDrive/MedolasionInPhase/archive.zip'
extract_path = '/content/radioml'
os.makedirs(extract_path, exist_ok=True)

with zipfile.ZipFile(zip_path, 'r') as z:
    z.extractall(extract_path)

with open(os.path.join(extract_path, 'RML2016.10a_dict.pkl'), 'rb') as f:
    data = pickle.load(f, encoding='latin1')

In [18]:
X, Y, SNR = [], [], []

for (mod, snr), samples in data.items():
    X.append(samples)
    Y.extend([mod]*samples.shape[0])
    SNR.extend([snr]*samples.shape[0])

X = np.vstack(X).astype(np.float32)   # (N, 2, 128)
Y = np.array(Y)
SNR = np.array(SNR)

In [19]:
# ----------------------------
# Normalize (sample-wise power)
# ----------------------------
power = np.sqrt(np.mean(X**2, axis=(1,2), keepdims=True))
X = X / (power + 1e-9)

In [20]:
# ----------------------------
# Build Multi-frame samples
# ----------------------------
def build_multiframe(X, Y, SNR, K):
    Xm, Ym, Sm = [], [], []
    for i in range(len(X)-K):
        Xm.append(X[i:i+K])
        Ym.append(Y[i+K-1])
        Sm.append(SNR[i+K-1])
    return np.array(Xm), np.array(Ym), np.array(Sm)

X, Y, SNR = build_multiframe(X, Y, SNR, K)
X = X[..., np.newaxis]   # (N, K, 2, 128, 1)

In [21]:
# ----------------------------
# Encode labels
# ----------------------------
lb = LabelBinarizer()
Y_enc = lb.fit_transform(Y)
num_classes = Y_enc.shape[1]

In [22]:
# ----------------------------
# Train / Val / Test split
# ----------------------------
X_train, X_test, Y_train, Y_test, SNR_train, SNR_test = train_test_split(
    X, Y_enc, SNR, test_size=0.2, random_state=42,
    stratify=Y
)

X_train, X_val, Y_train, Y_val, SNR_train, SNR_val = train_test_split(
    X_train, Y_train, SNR_train, test_size=0.2,
    random_state=42, stratify=np.argmax(Y_train, axis=1)
)

In [23]:
# ----------------------------
# Temporal Attention
# ----------------------------
class TemporalAttention(layers.Layer):
    def __init__(self):
        super().__init__()
        self.W = layers.Dense(64, activation='tanh')
        self.V = layers.Dense(1)

    def call(self, x):
        score = self.V(self.W(x))
        weights = tf.nn.softmax(score, axis=1)
        return tf.reduce_sum(weights * x, axis=1)

In [24]:
# ----------------------------
# Model
# ----------------------------
def build_model():
    iq = layers.Input(shape=(K,2,128,1))
    snr = layers.Input(shape=(1,))

    cnn = models.Sequential([
        layers.Conv2D(64,(2,3),padding='same'),
        layers.BatchNormalization(),
        layers.Activation('relu'),
        layers.MaxPooling2D((1,2)),
        layers.Conv2D(128,(1,3),padding='same'),
        layers.BatchNormalization(),
        layers.Activation('relu'),
        layers.MaxPooling2D((1,2)),
        layers.Flatten()
    ])

    x = layers.TimeDistributed(cnn)(iq)

    x = layers.Bidirectional(
        layers.GRU(128, return_sequences=True)
    )(x)

    x = TemporalAttention()(x)

    s = layers.Dense(32, activation='relu')(snr)
    x = layers.Concatenate()([x, s])

    x = layers.Dense(128, activation='relu')(x)
    x = layers.Dropout(0.4)(x)

    out = layers.Dense(num_classes, activation='softmax')(x)

    model = models.Model([iq, snr], out)

    model.compile(
        optimizer=tf.keras.optimizers.Adam(1e-3),
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )
    return model

model = build_model()
model.summary()

In [25]:
# ----------------------------
# Curriculum Learning (3 stages - FIXED)
# ----------------------------
def train_stage(snr_min, snr_max, epochs, lr):
    # ✅ Correct LR update (NO ERROR)
    model.optimizer.learning_rate.assign(lr)

    idx_tr = np.where((SNR_train>=snr_min)&(SNR_train<=snr_max))[0]
    idx_va = np.where((SNR_val>=snr_min)&(SNR_val<=snr_max))[0]

    print(f"Train: {len(idx_tr)} | Val: {len(idx_va)}")

    model.fit(
        [X_train[idx_tr], SNR_train[idx_tr].reshape(-1,1)],
        Y_train[idx_tr],
        validation_data=(
            [X_val[idx_va], SNR_val[idx_va].reshape(-1,1)],
            Y_val[idx_va]
        ),
        epochs=epochs,
        batch_size=BATCH_SIZE,
        verbose=2
    )

print("\nStage 1: High SNR")
train_stage(10, 20, epochs=15, lr=1e-3)

print("\nStage 2: Mid SNR")
train_stage(0, 10, epochs=20, lr=5e-4)

print("\nStage 3: Low SNR")
train_stage(-20, 0, epochs=25, lr=1e-4)


Stage 1: High SNR
Train: 35383 | Val: 8679
Epoch 1/15
139/139 - 23s - 166ms/step - accuracy: 0.4873 - loss: 1.3620 - val_accuracy: 0.7118 - val_loss: 0.6930
Epoch 2/15
139/139 - 11s - 80ms/step - accuracy: 0.7270 - loss: 0.5547 - val_accuracy: 0.7847 - val_loss: 0.3937
Epoch 3/15
139/139 - 11s - 80ms/step - accuracy: 0.7766 - loss: 0.4070 - val_accuracy: 0.8161 - val_loss: 0.3474
Epoch 4/15
139/139 - 21s - 148ms/step - accuracy: 0.8153 - loss: 0.3568 - val_accuracy: 0.8277 - val_loss: 0.3090
Epoch 5/15
139/139 - 11s - 81ms/step - accuracy: 0.8370 - loss: 0.3173 - val_accuracy: 0.8579 - val_loss: 0.2795
Epoch 6/15
139/139 - 11s - 81ms/step - accuracy: 0.8611 - loss: 0.2804 - val_accuracy: 0.8759 - val_loss: 0.2469
Epoch 7/15
139/139 - 11s - 82ms/step - accuracy: 0.8737 - loss: 0.2537 - val_accuracy: 0.8792 - val_loss: 0.2379
Epoch 8/15
139/139 - 11s - 82ms/step - accuracy: 0.8894 - loss: 0.2299 - val_accuracy: 0.8933 - val_loss: 0.2130
Epoch 9/15
139/139 - 11s - 82ms/step - accuracy: 0

In [26]:
# ----------------------------
# Final Test
# ----------------------------
loss, acc = model.evaluate(
    [X_test, SNR_test.reshape(-1,1)],
    Y_test,
    verbose=0
)

print("\nFINAL TEST ACCURACY:", round(acc,4))


FINAL TEST ACCURACY: 0.7194
