In [1]:
import torch
import torchvision.datasets as datasets
from torchvision import transforms
from torch import nn

In [2]:
def get_default_device():
    """Pick GPU if available, else CPU"""

    if torch.cuda.is_available():
        return torch.device('cuda')
    else:
        return torch.device('cpu')


def to_device(data, device):
    """Move tensor(s) to chosen device"""

    if isinstance(data, (list,tuple)):
        return [to_device(x, device) for x in data]
    return data.to(device, non_blocking=True)

In [3]:
class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(RNN, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(
            input_size, hidden_size,
            num_layers, batch_first=True
        )
        self.fc = nn.Linear(hidden_size, num_classes)
    
    def forward(self, x):
        h = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        c = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)

        out, _ = self.lstm(x, (h, c))  
        out = self.fc(out[:, -1, :])
        
        return out

In [4]:
class DeviceDataLoader():
    """Wrap a dataloader to move data to a device"""

    def __init__(self, dl, device):
        self.dl = dl
        self.device = device
        
    def __iter__(self):
        """Yield a batch of data after moving it to device"""
        for b in self.dl: 
            yield to_device(b, self.device)

    def __len__(self):
        """Number of batches"""
        return len(self.dl)

In [5]:
# Hyper parameters
learning_rate = 0.001
sequence_length = 28
hidden_size = 128
num_classes = 10
batch_size = 64
input_size = 28
num_layers = 2
num_epochs = 3

device = get_default_device()
device

device(type='cuda')

In [6]:
train_dataset = datasets.MNIST(
    root='./data/',
    train=True, 
    transform=transforms.ToTensor(),
    download=True
)
test_dataset = datasets.MNIST(
    root='./data/',
    train=False,
    transform=transforms.ToTensor()
)

Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz to ./data/MNIST\raw\train-images-idx3-ubyte.gz


100%|██████████| 9912422/9912422 [00:16<00:00, 592335.75it/s] 


Extracting ./data/MNIST\raw\train-images-idx3-ubyte.gz to ./data/MNIST\raw

Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz to ./data/MNIST\raw\train-labels-idx1-ubyte.gz


100%|██████████| 28881/28881 [00:00<00:00, 72040.00it/s]


Extracting ./data/MNIST\raw\train-labels-idx1-ubyte.gz to ./data/MNIST\raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz to ./data/MNIST\raw\t10k-images-idx3-ubyte.gz


100%|██████████| 1648877/1648877 [00:01<00:00, 935394.22it/s]


Extracting ./data/MNIST\raw\t10k-images-idx3-ubyte.gz to ./data/MNIST\raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz to ./data/MNIST\raw\t10k-labels-idx1-ubyte.gz


100%|██████████| 4542/4542 [00:00<00:00, 1135514.62it/s]

Extracting ./data/MNIST\raw\t10k-labels-idx1-ubyte.gz to ./data/MNIST\raw






In [7]:
train_loader = torch.utils.data.DataLoader(
    dataset=train_dataset,
    batch_size=batch_size,
    shuffle=True
)
test_loader = torch.utils.data.DataLoader(
    dataset=test_dataset, 
    batch_size=batch_size, 
    shuffle=False
)

In [8]:
model = RNN(input_size, hidden_size, num_layers, num_classes)
to_device(model, device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [9]:
# Train the model
total_step = len(train_loader)
for epoch in range(num_epochs):
    for i, (images, labels) in enumerate(train_loader):
        images = images.reshape(-1, sequence_length, input_size).to(device)
        labels = labels.to(device)
        
        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)
        
        # Backward pass and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        if (i + 1) % 100 == 0:
            print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(
                epoch + 1, num_epochs, i + 1, total_step, loss.item()
            ))

Epoch [1/3], Step [100/938], Loss: 0.7809
Epoch [1/3], Step [200/938], Loss: 0.6018
Epoch [1/3], Step [300/938], Loss: 0.3771
Epoch [1/3], Step [400/938], Loss: 0.4811
Epoch [1/3], Step [500/938], Loss: 0.2699
Epoch [1/3], Step [600/938], Loss: 0.1900
Epoch [1/3], Step [700/938], Loss: 0.1608
Epoch [1/3], Step [800/938], Loss: 0.1048
Epoch [1/3], Step [900/938], Loss: 0.1226
Epoch [2/3], Step [100/938], Loss: 0.1147
Epoch [2/3], Step [200/938], Loss: 0.0164
Epoch [2/3], Step [300/938], Loss: 0.0946
Epoch [2/3], Step [400/938], Loss: 0.1341
Epoch [2/3], Step [500/938], Loss: 0.0190
Epoch [2/3], Step [600/938], Loss: 0.0783
Epoch [2/3], Step [700/938], Loss: 0.1052
Epoch [2/3], Step [800/938], Loss: 0.1979
Epoch [2/3], Step [900/938], Loss: 0.1434
Epoch [3/3], Step [100/938], Loss: 0.1161
Epoch [3/3], Step [200/938], Loss: 0.1519
Epoch [3/3], Step [300/938], Loss: 0.0318
Epoch [3/3], Step [400/938], Loss: 0.1127
Epoch [3/3], Step [500/938], Loss: 0.0234
Epoch [3/3], Step [600/938], Loss:

In [10]:
# Evaluate the model
model.eval()
with torch.no_grad():
    right = 0
    total = 0
    for images, labels in test_loader:
        images = images.reshape(-1, sequence_length, input_size).to(device)
        labels = labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        right += (predicted == labels).sum().item()

print('Test Accuracy of the model on the 10000 test images: {} %'.format(100 * right / total))

Test Accuracy of the model on the 10000 test images: 98.21 %


In [11]:
# Save the model checkpoint
torch.save(model.state_dict(), 'rnn_model.pth')

In [24]:
import os
import cv2
import numpy as np

def load_model(model_path, device):
    model = RNN(input_size, hidden_size, num_layers, num_classes)
    model.load_state_dict(torch.load(model_path))
    model.to(device)
    model.eval()
    return model

def preprocess_image(image_path, device):
    image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
    image.resize((28, 28))
    image = image.astype(np.float32) / 255.0
    image = torch.tensor(image).unsqueeze(0).unsqueeze(0)  # (1, 1, 28, 28)
    image = image.reshape(-1, sequence_length, input_size).to(device)
    return image

def predict(image_path, model, device):
    image = preprocess_image(image_path, device)
    with torch.no_grad():
        outputs = model(image)
        _, predicted = torch.max(outputs.data, 1)
    return predicted.item()

# Prediction on provided image
device = get_default_device()
model = load_model('rnn_model.pth', device)

for img in os.listdir('./prediction_imgs'):
    image_path = f"./prediction_imgs/{img}"
    prediction = predict(image_path, model, device)
    print('Predicted class:', prediction)

# cap = cv2.VideoCapture(0)

# while True:
#     ret, org_frame = cap.read()
#     frame = cv2.cvtColor(org_frame, cv2.COLOR_BGR2GRAY)
#     frame = frame.astype(np.float32) / 255.0
#     frame = torch.tensor(frame).unsqueeze(0).unsqueeze(0)
#     frame = frame.reshape(-1, sequence_length, input_size).to(device)
#     with torch.no_grad():
#         outputs = model(frame)
#         _, predicted = torch.max(outputs.data, 1)
#     print(predicted.item())

#     if cv2.waitKey(25) == ord('q'):
#         break

# cap.release()
# cv2.destroyAllWindows()

Predicted class: 1
Predicted class: 8
Predicted class: 8
Predicted class: 8


In [25]:
import cv2
import torch

def load_model(model_path, device):
    model = RNN(input_size, hidden_size, num_layers, num_classes)
    model.load_state_dict(torch.load(model_path))
    model.to(device)
    model.eval()
    return model

def preprocess_frame(frame, device):
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    frame = cv2.resize(frame, (28, 28))
    frame = frame.astype(np.float32) / 255.0
    frame = torch.tensor(frame).unsqueeze(0).unsqueeze(0)  # (1, 1, 28, 28)
    frame = frame.reshape(-1, sequence_length, input_size).to(device)
    return frame

def predict(frame, model, device):
    frame = preprocess_frame(frame, device)
    with torch.no_grad():
        outputs = model(frame)
        _, predicted = torch.max(outputs.data, 1)
    return predicted.item()

# Load the model
device = get_default_device()
model = load_model('rnn_model.pth', device)

# Start the video capture
cap = cv2.VideoCapture(0)

while True:
    ret, frame = cap.read()
    if not ret:
        break

    # Make prediction
    prediction = predict(frame, model, device)
    cv2.putText(frame, f'Predicted: {prediction}', (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)

    # Display the resulting frame
    cv2.imshow('Live Camera', frame)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Release the capture
cap.release()
cv2.destroyAllWindows()