# Model Evaluation using PyTorch Processor

1. [Introduction](#Introduction)
2. [Prerequisites](#Prerequisites)
3. [Setup](#Setup)
4. [Dataset](#Dataset)
5. [Build a SageMaker Processing Job](#Build-a-SageMaker-Processing-Job)
    1. [Review Model Evaluation Script](#Model-Evaluation-Scripts)
    2. [Configure Processing Job](#Configure-Processing-Job)
6. [Review Outputs](#Review-Outputs)

## Prerequisites

Download the notebook into your environment, and you can run it by simply execute each cell in order. To understand what's happening, you'll need:

- Familiarity with Python and numpy
- Basic familiarity with AWS S3.
- Basic understanding of AWS Sagemaker.
- Basic familiarity with AWS Command Line Interface (CLI) -- ideally, you should have it set up with credentials to access the AWS account you're running this notebook from.
- SageMaker Studio is preferred for the full UI integration

## Setup

Setting up the environment, load the libraries, and define the parameter for the entire notebook.

Run the cell below to ensure latest version of SageMaker is installed in your kernel

In [None]:
!pip3 install gymnasium
!pip3 install xgboost
!pip3 install stable-baselines3
!pip3 install stable-baselines3[extra]
!pip3 install tqdm
!pip3 install rich

In [None]:
import sagemaker
from sagemaker import get_execution_role
import boto3

sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name
account = sagemaker_session.account_id()
role = sagemaker.get_execution_role()

default_bucket = sagemaker_session.default_bucket()
base_job_prefix = "cloudd-rf"
s3_client = boto3.client("s3")

## Build a SageMaker Processing Job

### Model Evaluation Scripts

In [2]:
%%writefile ../code/evaluation.py
import argparse
import os
import torch
import numpy as np
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
import xgboost as xgb
import torch
import glob
import pickle
import numpy as np
import pandas as pd
import seaborn as sns
from matplotlib.lines import Line2D
from tqdm import tqdm
from torch.utils.data import Dataset
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from matplotlib.pyplot import figure

import torch.nn as nn
import torch.nn.functional as F

import warnings
with warnings.catch_warnings():
    warnings.simplefilter("ignore")

SIG_TYPES = [['2-ASK', ['ask', 2], 0],
             ['4-ASK', ['ask', 4], 1],
             ['8-ASK', ['ask', 8], 2],
             ['BPSK', ['psk', 2], 3],
             ['QPSK', ['psk', 4], 4],
             ['16-QAM', ['qam', 16], 5],
             ['Tone', ['constant'], 6],
             ['P-FMCW', ['p_fmcw'], 7]]
NUM_CLASSES = len(SIG_TYPES)
sig_names = [i[0] for i in SIG_TYPES]

OBS_INT = {
    'model': 2048,
    1: 2048,
    2: 1024,
    3: 512,
    4: 256    
}

FC_LAYERS = {
    1: 'fc3',
    2: 'fc1',
    3: 'fc3',
    4: 'fc2'
}

def float_list(arg):
    return list(map(float, arg.split(',')))

def int_list(arg):
    return list(map(int, arg.split(',')))

def parse_args():
    parser = argparse.ArgumentParser()
    
    # Number of samples per file.
    parser.add_argument("--num-sensors", type=int, default=4)
    parser.add_argument("--batch-size", type=int, default=1)
    parser.add_argument("--samples-per-batch", type=int, default=1000) # CHUNK_SIZE
    parser.add_argument("--input-path", type=str, default=os.getenv("SM_CHANNEL_VAL"))
    parser.add_argument("--output-path", type=str, default=os.getenv("SM_OUTPUT_DIR"))
    parser.add_argument("--model-path", type=str, default=os.getenv("SM_BASE_MODEL_DIR"))
    
    return parser.parse_known_args()

def load_models(num_sensors, batch_size, samples_per_batch, input_path, model_path, device):
    global NUM_CLASSES
    print (f"Loading Models")
    import importlib
    models_config = {
        'team_models': {},
        'fused_models': {
            'baseline': {},
            'rl': {},
            'rfe': {}
        }
    }
    for sensor in range(1, num_sensors+1):
        models_config['team_models'][sensor] = {
           'model': None,
           'dataloader': None,
           'params': None,
           'features': None
        }    
    
        # Dynamically import each team model class and instantiate model from it
        module_name = f'team{sensor}_model'
        class_name = f'Team{sensor}Model'
        module = importlib.import_module(module_name)
        class_ = getattr(module, class_name)
        team_model = class_(NUM_CLASSES)
        team_model.load_state_dict(torch.load(f'{model_path}/team/team{sensor}_model.pt', map_location=torch.device(device)))
        team_model.eval()
        team_model.to(device)
        print (f"Loaded Team {sensor} model")
        models_config['team_models'][sensor]['model'] = team_model

        # Get the number of trainable parameters in each of the teams' models
        team_params = sum(p.numel() for p in team_model.parameters() if p.requires_grad)
        print(f'# trainable params, Team {sensor}:', team_model)
        models_config['team_models'][sensor]['params'] = team_params
    
        # Get Dataloaders
        print (f"Loading Dataloader for Model {sensor}")
        num_batches, dataloader = get_dataloaders(sensor, input_path, samples_per_batch, batch_size)
        models_config['team_models'][sensor]['dataloader'] = dataloader
    
        # Load Features
        print (f"Loading Features for Model {sensor}")
        models_config['team_models'][sensor]['features'] = load_features(team_model, dataloader, FC_LAYERS[sensor], sensor, device)
    
    # Load Labels
    print ("Loading Labels")
    models_config['labels'] = load_labels(input_path, num_batches)
    
    # Load regular fused model
    reg_fused_model = xgb.XGBClassifier(tree_method="hist")
    reg_fused_model.load_model(f'{model_path}/baseline/fusion_data/baseline_fused_2tl.json')
    print('loaded baseline fused model')
    models_config['fused_models']['baseline']['model'] = reg_fused_model

    # Load RL fused model
    rl_fused_model = xgb.XGBClassifier(tree_method="hist", early_stopping_rounds=2, n_estimators=5)
    rl_fused_model.load_model(f'{model_path}/rlrfe/fusion_data/rl_fused_2tl.json')
    with open(f'{model_path}/rlrfe/fusion_data/rl_feature_idxes_2tl.pkl', 'rb') as f:
        rl_feature_idxes = pickle.load(f)
    print('loaded RL fused model')
    models_config['fused_models']['rl']['model'] = rl_fused_model
    models_config['fused_models']['rl']['feat_idxs'] = rl_feature_idxes
    #models_config['rl_fused']['model'] = rl_fused_model

    # Load RFE fused model
    rfe_fused_model = xgb.XGBClassifier(tree_method="hist", early_stopping_rounds=2)
    rfe_fused_model.load_model(f'{model_path}/rlrfe/fusion_data/rfe_fused_2tl.json')
    with open(f'{model_path}/rlrfe/fusion_data/rfe_feature_idxes_2tl.pkl', 'rb') as f:
        rfe_feature_idxes = pickle.load(f)
    print('loaded RFE fused model')
    models_config['fused_models']['rfe']['model'] = rfe_fused_model
    models_config['fused_models']['rfe']['feat_idxs'] = rfe_feature_idxes
    return models_config

def get_num_samples(iq_input_path, samples_per_batch):
    joined_files = os.path.join(iq_input_path, "iqdata", "example_*.dat") 
    joined_list = glob.glob(joined_files)
    num_batches = len(joined_list)
    num_samples = num_batches * samples_per_batch
    return num_batches, num_samples
  
def load_data(channel_path, batch_size, num_batches, num_train_examples, data_obs_int):
    training_data = np.zeros((num_train_examples, 1, 2, OBS_INT['model']), dtype=np.float32)

    last_index = 0
    for k in range(num_batches):
        # This is used if we have a labeldata folder that stores class labels
        label_df = pd.read_csv(f"{channel_path}/labeldata/example_{k + 1}.csv")
        num_nans = 0
        iq_file_name = f"{channel_path}/iqdata/example_{k + 1}.dat"
        iq_data = np.fromfile(iq_file_name, np.csingle)
        iq_data = np.reshape(iq_data, (-1, data_obs_int))  # Turn the IQ data into chunks of (chunk size) x (data_obs_int)
        for j in range(iq_data.shape[0]):
            # Check if the current row contains NaN values
            if np.isnan(np.sum(iq_data[j][:])):    
                num_nans += 1
            else:
                iq_array_norm = iq_data[j][:] / np.max(np.abs(iq_data[j][:]))  # Normalize the observation
                iq_array = np.vstack((iq_array_norm.real, iq_array_norm.imag))  # Separate into 2 subarrays - 1 with only real (in-phase), the other only imaginary (quadrature)

                # Pad the iq array with zeros to meet the observation length requirement
                # This is needed because the CNN models have a fixed input size
                iq_array = np.pad(iq_array, ((0, 0), (0, OBS_INT['model'] - iq_array[0].size)), mode='constant', constant_values=0)

                training_data[last_index, 0, :, :] = iq_array
            last_index += 1
        
        if num_nans > 0:
            print(f'Found {num_nans} rows containing NaNs in {iq_file_name}')
    return torch.utils.data.DataLoader([training_data[i] for i in range(num_train_examples)], batch_size=batch_size, shuffle=False)

def get_dataloaders(sensor, input_path, samples_per_batch, batch_size):
    validation_dir = os.path.join(input_path, str(sensor))
    num_batches, num_samples = get_num_samples(validation_dir, samples_per_batch)
    dataloader = load_data(validation_dir, batch_size, num_batches, num_samples, OBS_INT[sensor])

    return num_batches, dataloader
    
def load_labels(input_path, num_batches):
    validation_dir = os.path.join(input_path, '1')
    labels = torch.stack([torch.nn.functional.one_hot(torch.tensor(pd.read_csv(os.path.join(validation_dir, f'labeldata/example_1.csv')).iloc[:,0])) for i in range(num_batches)]).numpy()
    labels = labels.reshape((labels.shape[0] * labels.shape[1], labels.shape[2]))
    labels = np.argmax(labels, axis=1)
    return labels

def load_features(team_model, dataloader, layer, sensor, device):
    features = {}

    def get_features(name):
        def hook(model, input, output):
            features[name] = output.detach()
        return hook

    selected_layer = getattr(team_model, layer)  #sometimes its just model or model.module
    input_features = selected_layer.in_features
    handle = selected_layer.register_forward_hook(get_features('feats'))

    feats_list = []

    # Feed the IQ data into the model
    for idx, inputs in tqdm(enumerate(dataloader)):
        with torch.inference_mode():
            preds = team_model(inputs.to(device))
        feats_list.append(features['feats'].cpu().numpy())

    feats_list = np.concatenate(feats_list)

    features = np.array(feats_list)
    features = torch.tensor(features)
    features = features.reshape(-1, features.shape[-1])
    
    handle.remove()
    
    return features

def get_team_model_accuracy(models_config, num_sensors, output_path):
    global NUM_CLASSES
    output_artifacts_dir = os.path.join(output_path, 'fusion_plots', 'confusion_matrices')
    os.makedirs(output_artifacts_dir, exist_ok=True)
    for sensor in range(1, num_sensors+1):
        print(f'Evaluating model {sensor} accuracy')
        # Get model predictions
        outputs = []
        with torch.inference_mode():
            for idx, inputs in tqdm(enumerate(models_config['team_models'][sensor]['dataloader'])):
                pred = np.argmax(models_config['team_models'][sensor]['model'](inputs.to(device)).cpu())
                outputs.append(pred)
        outputs = np.array(outputs)
        print(outputs.shape)
        # Create the confusion matrix
        conf_mat = np.zeros((NUM_CLASSES, NUM_CLASSES))
        for i in range(outputs.shape[0]):
            conf_mat[models_config['labels'][i], outputs[i]] += 1


        # Make each row in the confusion matrix sum to 1
        for i in range(conf_mat.shape[0]):
            conf_mat[i] = conf_mat[i] / np.sum(conf_mat[i])

        # Get the overall accuracy
        num_correct = np.sum(outputs == models_config['labels'])
        accuracy = num_correct / len(outputs)
        print(f'Team {sensor} model validation accuracy:', accuracy)

        # Set up plot
        figure(figsize=(8, 6))
        sns.heatmap(conf_mat, annot=True, xticklabels=sig_names, yticklabels=sig_names, fmt='.2f')
        plt.ylabel('True Label')
        plt.xlabel('Predicted Label')
        plt.title(f'Team {sensor} Model Confusion Matrix')
        plt.savefig(f'{output_artifacts_dir}/team{sensor}_confusion_matrix.png')

def get_fusion_model_accuracy(models_config, fusion_type, output_path):
    global NUM_CLASSES
    
    print(f'Evaluating {fusion_type} fusion model accuracy')
    
    output_artifacts_dir = os.path.join(output_path, 'fusion_plots', 'confusion_matrices')
    os.makedirs(output_artifacts_dir, exist_ok=True)
    
    combined_tensor = torch.cat(([models_config['team_models'][sensor]['features'] for sensor in range(1, len(models_config['team_models'])+1)]), dim=1)
    
    feats = combined_tensor if fusion_type == 'baseline' else combined_tensor[:, models_config['fused_models'][fusion_type]['feat_idxs']]
    
    preds = models_config['fused_models'][fusion_type]['model'].predict(feats)
    if fusion_type == 'baseline': 
        preds = np.argmax(preds, axis=1)

    num_correct = np.sum(preds == models_config['labels'])
    accuracy = num_correct / len(preds)
    print(f'{fusion_type} fused model validation accuracy:', accuracy)
    
    conf_mat = np.zeros((NUM_CLASSES, NUM_CLASSES))

    count = 0
    with torch.no_grad():
        outputs = models_config['fused_models'][fusion_type]['model'].predict(feats)
        if fusion_type == 'baseline': 
            outputs = np.argmax(outputs, axis=1)

        for i in range(outputs.shape[0]):
            conf_mat[models_config['labels'][i], outputs[i]] += 1

    # Make each row sum to 1
    for i in range(conf_mat.shape[0]):
        conf_mat[i] = conf_mat[i] / np.sum(conf_mat[i])

    figure(figsize=(8, 6))
    sns.heatmap(conf_mat, annot=True, xticklabels=sig_names, yticklabels=sig_names, fmt='.2f')
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.title(f'{fusion_type.upper()} Fused Model Confusion Matrix')
    plt.savefig(f'{output_artifacts_dir}/{fusion_type}_confusion_matrix.png')
    plt.show()

if __name__ == "__main__":
    
    args, _ = parse_args()
    
    device = "cuda" if torch.cuda.is_available() else "cpu"
    print('Using device', device)
    
    # Load Models
    models_config = load_models(args.num_sensors, args.batch_size, args.samples_per_batch, args.input_path, args.model_path, device)
    
    # Evaluate Model Accuracy
    get_team_model_accuracy(models_config, args.num_sensors, args.output_path)
    
    # Evaluate Fusion Model Accuracy
    for fusion_type in ['baseline','rl','rfe']:
        get_fusion_model_accuracy(models_config, fusion_type, args.output_path)

Overwriting ../code/evaluation.py


### Local Testing of File
Use the cell below to perform local testing of the file before launching a larger job on SageMaker. Make sure to update the file paths and args depending on the sample data in your local file system

In [None]:
!python ../code/evaluation.py --num-sensors 4 --batch-size 1 --samples-per-batch 100 --input-path "/root/ClouddRF_Final/cloudd-rf/data/validation" --output-path "/root/ClouddRF_Final/cloudd-rf/output" --model-path "/root/ClouddRF_Final/cloudd-rf/data/models"

## Configure Processing Job

In [None]:
from sagemaker.pytorch.processing import PyTorchProcessor

from sagemaker.processing import (
    ProcessingInput,
    ProcessingOutput,
)
import time 

timestamp = str(time.time()).split('.')[0]
output_prefix = f'{base_job_prefix}/evaluation/outputs/{timestamp}'
output_s3_uri = f's3://{default_bucket}/{output_prefix}'
code_location = f's3://{default_bucket}/{base_job_prefix}/evaluation/code'

# S3 Location of Validation Dataset
# UPDATE the var below with the s3 prefix (just the portion after /preprocess) of where the validation data is located
s3_validation_data = f's3://{default_bucket}/{base_job_prefix}/preprocess/outputs/1730252950/validation/'

# UPDATE the var below with the s3 prefix (just the portion after /training) of where the team models are located
team_model_prefix = 'training/pipelines-pvbseqe6oz4t-TrainModel-lVGjYR0zh2/output/model/'
s3_team_model_path = f's3://{default_bucket}/{base_job_prefix}/{team_model_prefix}'

# UPDATE the var below with the s3 prefix (just the portion after /training) of where the baseline fusion model is located
baseline_fused_model_prefix = 'fusion/baseline/outputs/1730252951/'
s3_baseline_model_path = f's3://{default_bucket}/{base_job_prefix}/{baseline_fused_model_prefix}'

# UPDATE the var below with the s3 prefix (just the portion after /training) of where the RL-RFE model is located
rlrfe_fused_model_prefix = 'fusion/rl_rfe/outputs/1730252951/'
s3_rlrfe_model_path = f's3://{default_bucket}/{base_job_prefix}/{rlrfe_fused_model_prefix}'

processing_instance_type = "ml.g5.xlarge"
processing_instance_count = 1
env_vars = {
    "SM_CHANNEL_VAL": "/opt/ml/processing/input/data/validation",
    "SM_TEAM_MODEL_DIR": "/opt/ml/processing/model/team",
    "SM_BASELINE_MODEL_DIR": "/opt/ml/processing/model/baseline",
    "SM_RLRFE_MODEL_DIR": "/opt/ml/processing/model/rlrfe",
    "SM_BASE_MODEL_DIR": "/opt/ml/processing/model",
    "SM_OUTPUT_DIR": "/opt/ml/processing/output"
}

pytorch_processor = PyTorchProcessor(
    framework_version='1.13.1',
    py_version="py39",
    role=role,
    env=env_vars,
    instance_count=processing_instance_count,
    instance_type=processing_instance_type,
    base_job_name = f"{base_job_prefix}-evaluation",
    code_location=code_location
)

In [None]:
# Processing Script Arguments
chunk_size = 100
batch_size = 1
num_sensors = 4 # Number of teams with distinct models

arguments = [
    "--samples-per-batch", str(chunk_size), 
    "--batch-size", str(batch_size),
    "--num-sensors", str(num_sensors)
]

code = 'evaluation.py'

In [None]:
pytorch_processor.run(
                        code=code,
                        source_dir='../code',
                        arguments=arguments,
                        inputs=[
                            ProcessingInput(source=s3_validation_data, destination=env_vars["SM_CHANNEL_VAL"], s3_data_type='S3Prefix'),
                            ProcessingInput(source=s3_team_model_path, destination=env_vars["SM_TEAM_MODEL_DIR"], s3_data_type='S3Prefix'),
                            ProcessingInput(source=s3_baseline_model_path, destination=env_vars["SM_BASELINE_MODEL_DIR"], s3_data_type='S3Prefix'),
                            ProcessingInput(source=s3_rlrfe_model_path, destination=env_vars["SM_RLRFE_MODEL_DIR"], s3_data_type='S3Prefix'),
                       ],
                        outputs=[
                            ProcessingOutput(source=env_vars["SM_OUTPUT_DIR"], destination = output_s3_uri)
                        ]
                    )

# Review Outputs

In [None]:
s3_client = boto3.client("s3")
response = s3_client.list_objects_v2(Bucket=default_bucket, Prefix=output_s3_uri)
files = response.get("Contents")

for file in files:
    print(f"file_name: {file['Key']}, size: {file['Size']}")