In [5]:
"""
Written by, 
Sriram Ravindran, sriram@ucsd.edu

Original paper - https://arxiv.org/abs/1611.08024

Please reach out to me if you spot an error.
"""

'\nWritten by, \nSriram Ravindran, sriram@ucsd.edu\n\nOriginal paper - https://arxiv.org/abs/1611.08024\n\nPlease reach out to me if you spot an error.\n'

In [1]:

import numpy as np
from sklearn.metrics import roc_auc_score, precision_score, recall_score, accuracy_score
import torch
import torch.nn as nn
import torch.optim as optim
from torch.autograd import Variable
import torch.nn.functional as F
import torch.optim as optim

<p>Here's the description from the paper</p>
<img src="EEGNet.png" style="width: 700px; float:left;">

In [5]:
print(torch.cuda.is_available())

False


#### Evaluate function returns values of different criteria like accuracy, precision etc. 
In case you face memory overflow issues, use batch size to control how many samples get evaluated at one time. Use a batch_size that is a factor of length of samples. This ensures that you won't miss any samples.

#### Generate random data

##### Data format:
Datatype - float32 (both X and Y) <br>
X.shape - (#samples, 1, #timepoints,  #channels) <br>
Y.shape - (#samples)

#### Run

In [32]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from sklearn.metrics import accuracy_score, roc_auc_score, precision_score, recall_score

# --------------------------
# 1) DEFINE YOUR MODEL
# --------------------------
class EEGNet(nn.Module):
    def __init__(self):
        super(EEGNet, self).__init__()
        self.T = 120
        
        # Layer 1
        self.conv1 = nn.Conv2d(1, 16, (1, 64), padding=0)
        self.batchnorm1 = nn.BatchNorm2d(16, affine=False)

        # Layer 2
        self.padding1 = nn.ZeroPad2d((16, 17, 0, 1))
        self.conv2 = nn.Conv2d(1, 4, (2, 32))
        self.batchnorm2 = nn.BatchNorm2d(4, affine=False)
        self.pooling2 = nn.MaxPool2d(2, 4)
        
        # Layer 3
        self.padding2 = nn.ZeroPad2d((2, 1, 4, 3))
        self.conv3 = nn.Conv2d(4, 4, (8, 4))
        self.batchnorm3 = nn.BatchNorm2d(4, affine=False)
        self.pooling3 = nn.MaxPool2d((2, 4))

        # FC Layer (adjust dimensions as needed)
        self.fc1 = nn.Linear(4 * 2 * 7, 1)  # => 4 channels * 2 * 7

    def forward(self, x):
        # Layer 1
        x = F.elu(self.conv1(x))
        x = self.batchnorm1(x)
        x = F.dropout(x, 0.25)
        x = x.permute(0, 3, 1, 2)  # reorder dims (B, 1, H, W) => (B, W, 16, ?)
        
        # Layer 2
        x = self.padding1(x)
        x = F.elu(self.conv2(x))
        x = self.batchnorm2(x)
        x = F.dropout(x, 0.25)
        x = self.pooling2(x)
        
        # Layer 3
        x = self.padding2(x)
        x = F.elu(self.conv3(x))
        x = self.batchnorm3(x)
        x = F.dropout(x, 0.25)
        x = self.pooling3(x)
        
        # Flatten
        x = x.reshape(x.size(0), -1)
        x = torch.sigmoid(self.fc1(x))
        return x


# --------------------------
# 2) EVALUATE FUNCTION
# --------------------------
def evaluate(model, X, Y, batch_size=64, device='cpu', metrics=["acc"]):
    """
    Evaluates the model on dataset X with labels Y in chunks of `batch_size`.
    Returns a list of metrics in the order specified by `metrics`.
    """
    model.eval()  # Important if you have dropout/batchnorm
    predicted_probs = []
    
    # Forward pass in chunks to avoid OOM errors
    n_samples = len(X)
    for start in range(0, n_samples, batch_size):
        end = min(start + batch_size, n_samples)
        inputs = torch.from_numpy(X[start:end]).float().to(device)
        
        with torch.no_grad():
            preds = model(inputs)            # shape: [batch_size, 1]
        predicted_probs.append(preds.cpu().numpy())
    
    # Concatenate predictions for all batches
    predicted_probs = np.concatenate(predicted_probs, axis=0).ravel()  # shape: (n_samples,)
    
    results = []
    for m in metrics:
        if m == 'acc':
            results.append(accuracy_score(Y, np.round(predicted_probs)))
        elif m == 'auc':
            results.append(roc_auc_score(Y, predicted_probs))
        elif m == 'recall':
            results.append(recall_score(Y, np.round(predicted_probs)))
        elif m == 'precision':
            results.append(precision_score(Y, np.round(predicted_probs)))
        elif m == 'fmeasure':
            precision = precision_score(Y, np.round(predicted_probs))
            recall = recall_score(Y, np.round(predicted_probs))
            if precision + recall > 0:
                f1 = 2 * precision * recall / (precision + recall)
            else:
                f1 = 0
            results.append(f1)
    
    return results


# --------------------------
# 3) TRAINING LOOP
# --------------------------
def train_loop(model, X_train, y_train, X_val, y_val, X_test, y_test,
               criterion, optimizer, device='cpu', epochs=5, batch_size=32):
    """
    A simple training loop with evaluation after each epoch.
    """
    model.to(device)
    
    for epoch in range(epochs):
        model.train()   # set to training mode
        running_loss = 0.0
        n_samples = len(X_train)
        
        for start in range(0, n_samples, batch_size):
            end = min(start + batch_size, n_samples)
            
            inputs = torch.from_numpy(X_train[start:end]).float().to(device)
            labels = torch.from_numpy(y_train[start:end]).float().to(device)
            labels = labels.view(-1, 1)  # make sure shape is [batch_size, 1]
            
            optimizer.zero_grad()
            outputs = model(inputs)   # shape: [batch_size, 1]
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
        
        # Evaluate at the end of each epoch
        params = ["acc", "auc", "fmeasure"]
        train_metrics = evaluate(model, X_train, y_train, batch_size=64, device=device, metrics=params)
        val_metrics = evaluate(model, X_val, y_val, batch_size=64, device=device, metrics=params)
        test_metrics = evaluate(model, X_test, y_test, batch_size=64, device=device, metrics=params)
        
        print(f"\nEpoch {epoch+1}/{epochs}")
        print("Training Loss:", running_loss)
        print("Train:", dict(zip(params, train_metrics)))
        print("Validation:", dict(zip(params, val_metrics)))
        print("Test:", dict(zip(params, test_metrics)))


# --------------------------
# 4) DEMO: SYNTHETIC DATA
# --------------------------
# We'll mimic your random data approach:
X_train = np.random.rand(128, 1, 120, 64).astype('float32')
y_train = np.round(np.random.rand(128).astype('float32'))

X_val = np.random.rand(128, 1, 120, 64).astype('float32')
y_val = np.round(np.random.rand(128).astype('float32'))

X_test = np.random.rand(128, 1, 120, 64).astype('float32')
y_test = np.round(np.random.rand(128).astype('float32'))

# --------------------------
# 5) RUN TRAIN & EVAL
# --------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
net = EEGNet()
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(net.parameters())

train_loop(net, 
           X_train, y_train, 
           X_val, y_val, 
           X_test, y_test, 
           criterion, optimizer,
           device=device,
           epochs=5,   # run more for a real scenario
           batch_size=32)

# Finally, you can do a separate final evaluation if desired:
final_test_metrics = evaluate(net, X_test, y_test, batch_size=64, device=device, metrics=["acc","auc","fmeasure"])
print("\nFinal Test Metrics:", final_test_metrics)



Epoch 1/5
Training Loss: 3.034483313560486
Train: {'acc': 0.5, 'auc': np.float64(0.5480792757523856), 'fmeasure': np.float64(0.15789473684210525)}
Validation: {'acc': 0.546875, 'auc': np.float64(0.5096059113300493), 'fmeasure': np.float64(0.14705882352941177)}
Test: {'acc': 0.4765625, 'auc': np.float64(0.48229548229548225), 'fmeasure': np.float64(0.10666666666666667)}

Epoch 2/5
Training Loss: 2.9219780564308167
Train: {'acc': 0.484375, 'auc': np.float64(0.47761194029850745), 'fmeasure': np.float64(0.4406779661016949)}
Validation: {'acc': 0.5546875, 'auc': np.float64(0.5105911330049261), 'fmeasure': np.float64(0.5210084033613445)}
Test: {'acc': 0.53125, 'auc': np.float64(0.5047619047619047), 'fmeasure': np.float64(0.5238095238095238)}

Epoch 3/5
Training Loss: 2.9577942490577698
Train: {'acc': 0.53125, 'auc': np.float64(0.5666748226082701), 'fmeasure': np.float64(0.5384615384615385)}
Validation: {'acc': 0.484375, 'auc': np.float64(0.4724137931034483), 'fmeasure': np.float64(0.43103448

In [9]:
from mmpose.apis import init_pose_model, inference_top_down_pose_model

# Example config & checkpoint for a hand pose model
pose_config = 'configs/hand/td-hm_hrnet_w18_256x256.yaml'
pose_checkpoint = 'checkpoints/hrnet_w18.pth'

# Build the model from a config file and a checkpoint file
pose_model = init_pose_model(pose_config, pose_checkpoint, device='cuda:0')

ModuleNotFoundError: No module named 'mmcv'