# EEG Data Manipulation
### By: Lela Bones and Adam Jump
The purpose of this journal is to view our dataset of EEG Data and manipulate the data into the format that we want.

In regards to our research, we want to be able to make software that uses a close-looped system to allow a user to control a robotic arm with their mind. 

### Step One: Reading in the data

In [2]:
import json
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import torch

#best file= mac_dude_BlinkTest_1.json
#second best = a88ac021f60d23d32aec7764199fcfcf64c3784548eedc852c90f6a4baa24f48.json
filename = open('data/mac_dude_BlinkTest_1.json', 'r')

#loads json file
data = json.load(filename)
#stores only the eeg data
eeg = np.array(data['patterns'])

#maps the node number to the data
inputs = map(lambda p: p['input'], eeg[2:])
#puts data into a list
inputs = list(inputs)
#puts data into an array
inputs = np.array(inputs)
#maps the node number to the data
outputs = map(lambda p: p['output'], eeg[2:])
#puts data into a list
outputs = list(outputs)
#puts data into an array
outputs = np.array(outputs)
#turns into dataframe
data = np.append(inputs, outputs, 1)
df = pd.DataFrame(data=data)
torch.save(data, open('traindata.pt', 'wb'))

In [3]:
df.head()
data.shape[0]/2

6162.0

### Step Two: Filtering the data


#### Types of EEG Data Filtering
According to a few sources, we believe that a FFT based filter would be the best. There are a couple of ways to do this, Finite Impulse Response (FIR) is one way. Using the SyPy library we decided from this [readthedocs](https://scipy-cookbook.readthedocs.io/items/ApplyFIRFilter.html) that signal.fftconvolve would be the fastest/most stable method of performing an FIR on the data.

In [4]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
#Initialized the class for the LSTM
# nn.Module is a built in base class for all nueral network models
class Sequence(nn.Module):
    #Initializes the LSTM
    def __init__(self, hidden_size):
        #We Intialize with super so we have access to the nn.Module
        super(Sequence, self).__init__()
        self.hidden_size = hidden_size
        #Initializes an LSTMCell(input_size, hidden_size)
        self.lstm1 = nn.LSTMCell(1, self.hidden_size) #We are training the first space 
        self.lstm2 = nn.LSTMCell(self.hidden_size, self.hidden_size) #We are training the first half
        #Applies a linear transformation to the data of y=x(A^T)+b
        self.linear = nn.Linear(self.hidden_size, 1) #We are tranforming lstm2 into a linear wave
    
    #This is the feed-forward function where we default our prediction to 0
    def forward(self, input, future = 0):
        outputs = [] #We initialize the array for our output
        #Initializing the hidden state(batch, hidden_size) for each element in the batch
        #It is defaulted to 0 because it wasn't provided 
        h_t = torch.zeros(input.size(0), self.hidden_size, dtype=torch.double)
        #Initializing the cell state(batch, hidden_size) for each element in the batch
        #It is also defaulted to 0 if not provided
        c_t = torch.zeros(input.size(0), self.hidden_size, dtype=torch.double)
        #Storing the next hidden state (batch, hidden_size) for each element in the batch
        h_t2 = torch.zeros(input.size(0), self.hidden_size, dtype=torch.double)
        ##Storing the next cell state (batch, hidden_size) for each element in the batch
        c_t2 = torch.zeros(input.size(0), self.hidden_size, dtype=torch.double)
    
        for i, input_t in enumerate(input.chunk(input.size(1), dim=1)):
                    h_t, c_t = self.lstm1(input_t, (h_t, c_t))
                    h_t2, c_t2 = self.lstm2(h_t, (h_t2, c_t2))
                    output = self.linear(h_t2)
                    outputs += [output]
        for i in range(future):# if we should predict the future
            h_t, c_t = self.lstm1(output, (h_t, c_t))
            h_t2, c_t2 = self.lstm2(h_t, (h_t2, c_t2))
            output = self.linear(h_t2)
            outputs += [output]
        outputs = torch.stack(outputs, 1).squeeze(2)
        return outputs

In [1]:
# set random seed to 0
np.random.seed(0)
torch.manual_seed(0)
# load data and make training set
data = torch.load('traindata.pt')
input = torch.from_numpy(data[6162:, :7])
target = torch.from_numpy(data[6162:, 8])
test_input = torch.from_numpy(data[:6162, :7])
test_target = torch.from_numpy(data[:6162, 8])
# build the model
seq = Sequence(50)
seq.double()
criterion = nn.MSELoss()
# use LBFGS as optimizer since we can load the whole data to train
optimizer = optim.LBFGS(seq.parameters(), lr=0.8)
#begin to train
for i in range(100):
    print('STEP: ', i)
    def closure():
        optimizer.zero_grad()
        out = seq.forward(input)
        loss = criterion(out, target)
        print('loss:', loss.item())
        loss.backward()
        return loss
    optimizer.step(closure)
    # begin to predict, no need to track gradient here
    with torch.no_grad():
        future = 1000
        pred = seq(test_input, future=future)
        loss = criterion(pred[:, :-future], test_target)
        print('test loss:', loss.item())
        y = pred.detach().numpy()
        # draw the result
        plt.figure(figsize=(30,10))
        plt.title('Predict future values for time sequences\n(Dashlines are predicted values)', fontsize=30)
        plt.xlabel('x', fontsize=20)
        plt.ylabel('y', fontsize=20)
        plt.xticks(fontsize=20)
        plt.yticks(fontsize=20)
        def draw(yi, color):
            plt.plot(np.arange(input.size(1)), yi[:input.size(1)], color, linewidth = 2.0)
            plt.plot(np.arange(input.size(1), input.size(1) + future), yi[input.size(1):], color + ':', linewidth = 2.0)
        draw(y[0], 'r')
        draw(y[1], 'g')
        draw(y[2], 'b')
        plt.savefig('predict%d.pdf'%i)
    plt.close()
#     checkpoint = {'n_hidden': seq.n_hidden,
#               'n_layers': net.n_layers,
#               'state_dict': net.state_dict(),
#               'tokens': net.chars}
#     with open('rnn.net', 'wb') as f:
#         torch.save(,f)

NameError: name 'np' is not defined

In [5]:
import torch 
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms


# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Hyper-parameters
sequence_length = 28
input_size = 28
hidden_size = 128
num_layers = 2
num_classes = 10
batch_size = 100
num_epochs = 2
learning_rate = 0.01

# MNIST dataset
train_dataset = torchvision.datasets.MNIST(root='../../data/',
                                           train=True, 
                                           transform=transforms.ToTensor(),
                                           download=True)

test_dataset = torchvision.datasets.MNIST(root='../../data/',
                                          train=False, 
                                          transform=transforms.ToTensor())

# Data loader
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                           batch_size=batch_size, 
                                           shuffle=True)

test_loader = torch.utils.data.DataLoader(dataset=test_dataset,
                                          batch_size=batch_size, 
                                          shuffle=False)

# Recurrent neural network (many-to-one)
class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(RNN, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)
    
    def forward(self, x):
        # Set initial hidden and cell states 
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device) 
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        
        # Forward propagate LSTM
        out, _ = self.lstm(x, (h0, c0))  # out: tensor of shape (batch_size, seq_length, hidden_size)
        
        # Decode the hidden state of the last time step
        out = self.fc(out[:, -1, :])
        return out

model = RNN(input_size, hidden_size, num_layers, num_classes).to(device)


# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

# Train the model
total_step = len(train_loader)
for epoch in range(num_epochs):
    for i, (images, labels) in enumerate(train_loader):
        images = images.reshape(-1, sequence_length, input_size).to(device)
        labels = labels.to(device)
        
        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)
        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        if (i+1) % 100 == 0:
            print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}' 
                   .format(epoch+1, num_epochs, i+1, total_step, loss.item()))

# Test the model
with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in test_loader:
        images = images.reshape(-1, sequence_length, input_size).to(device)
        labels = labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    print('Test Accuracy of the model on the 10000 test images: {} %'.format(100 * correct / total)) 

# Save the model checkpoint
torch.save(model.state_dict(), 'model.ckpt')

Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Processing...
Done!
Epoch [1/2], Step [100/600], Loss: 0.5941
Epoch [1/2], Step [200/600], Loss: 0.4275
Epoch [1/2], Step [300/600], Loss: 0.2043
Epoch [1/2], Step [400/600], Loss: 0.1226
Epoch [1/2], Step [500/600], Loss: 0.0515
Epoch [1/2], Step [600/600], Loss: 0.1214
Epoch [2/2], Step [100/600], Loss: 0.0225
Epoch [2/2], Step [200/600], Loss: 0.0461
Epoch [2/2], Step [300/600], Loss: 0.0118
Epoch [2/2], Step [400/600], Loss: 0.0416
Epoch [2/2], Step [500/600], Loss: 0.1680
Epoch [2/2], Step [600/600], Loss: 0.0830
Test Accuracy of the model on the 10000 test images: 97.54 %


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
#Initialized the class for the LSTM
# nn.Module is a built in base class for all nueral network models
class Sequence(nn.Module):
    #Initializes the LSTM
    def __init__(self):
        #We Intialize with super so we have access to the nn.Module
        super(Sequence, self).__init__()
        #Initializes an LSTMCell(input_size, hidden_size)
        self.lstm1 = nn.LSTMCell(1, 51) #We are training the first space 
        self.lstm2 = nn.LSTMCell(51, 51) #We are training the first half
        #Applies a linear transformation to the data of y=x(A^T)+b
        self.linear = nn.Linear(51, 1) #We are tranforming lstm2 into a linear wave
    
    #This is the feed-forward function where we default our prediction to 0
    def forward(self, input, future = 0):
        outputs = [] #We initialize the array for our output
        #Initializing the hidden state(batch, hidden_size) for each element in the batch
        #It is defaulted to 0 because it wasn't provided 
        h_t = torch.zeros(input.size(0), 51, dtype=torch.double)
        #Initializing the cell state(batch, hidden_size) for each element in the batch
        #It is also defaulted to 0 if not provided
        c_t = torch.zeros(input.size(0), 51, dtype=torch.double)
        #Storing the next hidden state (batch, hidden_size) for each element in the batch
        h_t2 = torch.zeros(input.size(0), 51, dtype=torch.double)
        ##Storing the next cell state (batch, hidden_size) for each element in the batch
        c_t2 = torch.zeros(input.size(0), 51, dtype=torch.double)
    
        for i, input_t in enumerate(input.chunk(input.size(1), dim=1)):
                    h_t, c_t = self.lstm1(input_t, (h_t, c_t))
                    h_t2, c_t2 = self.lstm2(h_t, (h_t2, c_t2))
                    output = self.linear(h_t2)
                    outputs += [output]
        for i in range(future):# if we should predict the future
            h_t, c_t = self.lstm1(output, (h_t, c_t))
            h_t2, c_t2 = self.lstm2(h_t, (h_t2, c_t2))
            output = self.linear(h_t2)
            outputs += [output]
        outputs = torch.stack(outputs, 1).squeeze(2)
        return outputs


if __name__ == '__main__':
    # set random seed to 0
    np.random.seed(0)
    torch.manual_seed(0)
    # load data and make training set
    data = torch.load('traindata.pt')
    input = torch.from_numpy(data[3:, :-1])
    target = torch.from_numpy(data[3:, 1:])
    test_input = torch.from_numpy(data[:3, :-1])
    test_target = torch.from_numpy(data[:3, 1:])
    # build the model
    seq = Sequence()
    seq.double()
    criterion = nn.MSELoss()
    # use LBFGS as optimizer since we can load the whole data to train
    optimizer = optim.LBFGS(seq.parameters(), lr=0.8)
    #begin to train
    for i in range(15):
        print('STEP: ', i)
        def closure():
            optimizer.zero_grad()
            out = seq(input)
            loss = criterion(out, target)
            print('loss:', loss.item())
            loss.backward()
            return loss
        optimizer.step(closure)
        # begin to predict, no need to track gradient here
        with torch.no_grad():
            future = 1000
            pred = seq(test_input, future=future)
            loss = criterion(pred[:, :-future], test_target)
            print('test loss:', loss.item())
            y = pred.detach().numpy()
        # draw the result
        plt.figure(figsize=(30,10))
        plt.title('Predict future values for time sequences\n(Dashlines are predicted values)', fontsize=30)
        plt.xlabel('x', fontsize=20)
        plt.ylabel('y', fontsize=20)
        plt.xticks(fontsize=20)
        plt.yticks(fontsize=20)
        def draw(yi, color):
            plt.plot(np.arange(input.size(1)), yi[:input.size(1)], color, linewidth = 2.0)
            plt.plot(np.arange(input.size(1), input.size(1) + future), yi[input.size(1):], color + ':', linewidth = 2.0)
        draw(y[0], 'r')
        draw(y[1], 'g')
        draw(y[2], 'b')
        plt.savefig('predict%d.pdf'%i)
    plt.close()
#     checkpoint = {'n_hidden': seq.n_hidden,
#               'n_layers': net.n_layers,
#               'state_dict': net.state_dict(),
#               'tokens': net.chars}
#     with open('rnn.net', 'wb') as f:
#         torch.save(,f)