In [1]:
import os

# Torch
import torch
import torchvision
from torch.utils.data import Dataset
from torchvision import datasets
import torch.optim as optim
from torchvision.transforms import ToTensor, Compose, Normalize
from torchvision.datasets import MNIST
import torch.nn.functional as F
import torch.nn as nn
from PIL import Image

from tqdm import tqdm

# Use GPU if available, else use CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


## Dataset

In [2]:
# from roboflow import Roboflow
# rf = Roboflow(api_key="WewILBCNfyY6D0eTbQ2j")
# project = rf.workspace("roboflow-universe-projects").project("fall-detection-ca3o8")
# version = project.version(4)
# dataset = version.download("yolov8")


In [3]:
# define dataset from yolov8 annotations
class CustomDataset(Dataset):
  def __init__(self, dir, transform=None, target_transform=None, device=device):
    self.img_dir = f"{dir}/images"
    self.label_dir = f"{dir}/labels"

    # extract file names from the directory, removing the file extension and parent directory
    self.img_names = [os.path.splitext(name)[0] for name in os.listdir(self.img_dir)]

    self.transform = transform
    self.target_transform = target_transform
    self.device = device

  def __len__(self):
    return len(self.img_names)

  def __getitem__(self, idx):
    if isinstance(idx, slice):
      return [self.__getitem__(i) for i in range(*idx.indices(len(self)))]
    
    # construct the file paths for the image and label from the directory, add extension
    img_path = os.path.join(self.img_dir, self.img_names[idx] + ".jpg")
    label_path = os.path.join(self.label_dir, self.img_names[idx] + ".txt")
    image = Image.open(img_path)
    labels = torch.tensor([[float(l) for l in line.rstrip('\n').split()] for line in open(label_path)])
    if self.transform:
      image = self.transform(image)  
    if self.target_transform:
      labels = self.target_transform(labels)
    return image.to(self.device), labels.to(self.device)

In [4]:
dataset = CustomDataset("Fall-Detection-4/train", transform=ToTensor())

In [5]:
dataset[0][0].shape

torch.Size([3, 640, 640])

In [6]:
len(dataset)

9438

In [7]:
# # compute mean and standard deviation of the dataset
# concat = torch.cat([dataset[i][0] for i in range(4000)], 0).to(torch.float32)
# mean = concat.mean()
# std = concat.std()
# mean, std

In [8]:
transform_data = Compose([ToTensor(), Normalize((0.4379,), (0.3040,))])

batch_size = 32
train_dataset = CustomDataset("Fall-Detection-4/train", transform=transform_data)
valid_dataset = CustomDataset("Fall-Detection-4/valid", transform=transform_data)
test_dataset = CustomDataset("Fall-Detection-4/test", transform=transform_data)

In [9]:
N = 1000
train_loader = torch.utils.data.DataLoader(train_dataset[:N], batch_size=batch_size, shuffle=True)
valid_loader = torch.utils.data.DataLoader(valid_dataset[:N//10], batch_size=batch_size, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset[:N//10], batch_size=batch_size, shuffle=True)

In [10]:
def train(model, train_loader, valid_loader, test_loader, epochs, criterion):
  optimizer = optim.Adam(model.parameters(), lr=0.001)
  for epoch in range(epochs):
    model.train()
    running_loss = 0.0
    for batch_idx, (inputs, labels) in tqdm(enumerate(train_loader, 0), total=len(train_loader)):
      inputs, labels = inputs.to(device), labels.to(device)
      optimizer.zero_grad()
      outputs = model(inputs)
      loss = criterion(outputs, labels)
      loss.backward()
      optimizer.step()
      running_loss += loss.item()
      if batch_idx % 10 == 9:
        print(f"[{epoch + 1}, {batch_idx + 1}] train loss: {running_loss / 10}")
        running_loss = 0.0

    model.eval()
    for batch_idx, (inputs, labels) in tqdm(enumerate(valid_loader, 0), total=len(valid_loader)):
      inputs, labels = inputs.to(device), labels.to(device)
      outputs = model(inputs)
      loss = criterion(outputs, labels)
      if batch_idx % 10 == 9:
        print(f"[{epoch + 1}, {batch_idx + 1}] valid loss: {loss.item()}")

  for batch_idx, (inputs, labels) in tqdm(enumerate(test_loader, 0), total=len(test_loader)):
    inputs, labels = inputs.to(device), labels.to(device)
    outputs = model(inputs)
    loss = criterion(outputs, labels)
    if batch_idx % 10 == 9:
      print(f"[{epoch + 1}, {batch_idx + 1}] test loss: {loss.item()}")

  print(f"Final test loss: {loss.item()}")

    
  print("Finished Training")

## Exploring Architectures

### YOLO

In [11]:
class ConvLayer(nn.Module):
  def __init__(self, in_channels, out_channels, kernel_size, stride = 1):
    super(ConvLayer, self).__init__()
    self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride, kernel_size // 2)
    self.bn = nn.BatchNorm2d(out_channels)
    self.relu = nn.ReLU()
  
  def forward(self, x):
    return self.relu(self.bn(self.conv(x)))

class ConvBlock(nn.Module):
  def __init__(self, in_channels, downsample_channels, out_channels, kernel_size, stride = 1):
    super(ConvBlock, self).__init__()
    self.conv_layers = nn.Sequential(
      ConvLayer(in_channels, downsample_channels, 1, 1),
      ConvLayer(downsample_channels, out_channels, kernel_size, stride)
    )
  
  def forward(self, x):
    return self.conv_layers(x)

In [12]:
class YOLONet(nn.Module):
  def __init__(self, s=10, b=2, c=20, ch=3, reg_max=16):
    super(YOLONet, self).__init__()

    self.s = s
    self.nc = c
    
    self.nl = ch
    self.reg_max = reg_max
    self.no = c + self.reg_max * 4

    self.args = {
      "box": 7.5, # (float) box loss gain
      "cls": 0.5, # (float) cls loss gain (scale with pixels)
      "dfl": 1.5, # (float) dfl loss gain
    }

    self.conv_layers = nn.Sequential(
      ConvLayer(3, 64, 7, 2),
      nn.MaxPool2d(2, 2),
      ConvLayer(64, 192, 3, 1),
      nn.MaxPool2d(2, 2),
      ConvBlock(192, 128, 256, 3, 1),
      ConvBlock(256, 256, 512, 3, 1),
      nn.MaxPool2d(2, 2),
      ConvBlock(512, 256, 512, 3, 1),
      ConvBlock(512, 256, 512, 3, 1),
      ConvBlock(512, 256, 512, 3, 1),
      ConvBlock(512, 256, 512, 3, 1),
      ConvBlock(512, 512, 1024, 3, 1),
      nn.MaxPool2d(2, 2),
      ConvBlock(1024, 512, 1024, 3, 1),
      ConvBlock(1024, 512, 1024, 3, 1),
    ) 

    self.conv_layers2 = nn.Sequential(
      ConvLayer(1024, 1024, 3, 1),
      ConvLayer(1024, 1024, 3, 2),
      ConvLayer(1024, 1024, 3, 1),
      ConvLayer(1024, 1024, 3, 1),
    )

    self.bbox_head = nn.Sequential(
      ConvLayer(1024, 1024, 3, 1),
      nn.Conv2d(1024, self.reg_max*4, 1, 1)
    )

    self.cls_head = nn.Sequential(
      ConvLayer(1024, 1024, 3, 1),
      nn.Conv2d(1024, self.nc, 1, 1)
    )

    # self.fc_layers = nn.Sequential(
    #   nn.Linear(1024 * s * s, 4096),
    #   nn.ReLU(),
    #   nn.Linear(4096, s * s * reg_max * (b * 5 + c)),
    #   nn.ReLU()
    # )

    self.stride = 16

  def forward(self, x):
    x = self.conv_layers(x)
    x = self.conv_layers2(x)
    x = x.view(x.size(0), -1)
    x = self.fc_layers(x)
    return x
  
class YOLOPretrainer(nn.Module):
  def __init__(self, model: YOLONet, out_features: int):
    super(YOLOPretrainer, self).__init__()
    self.model = model
    self.conv_layers = model.conv_layers

    self.avg_pool = nn.AvgPool2d(2, 2)
    self.fc = nn.Linear(1024 * model.s * model.s, out_features)

  def forward(self, x):
    out = self.conv_layers(x)
    out = self.avg_pool(out)
    out = out.view(out.size(0), -1)
    out = self.fc(out)
    return out

In [13]:
yolo = YOLONet().to(device)


In [14]:
dataset[0][0].shape

torch.Size([3, 640, 640])

In [15]:
dataset[0][1].shape

torch.Size([1, 5])

In [16]:
batch = next(iter(train_loader))
batch[0].shape

torch.Size([32, 3, 640, 640])

In [17]:
out = yolo(batch[0])

In [18]:
out.shape

torch.Size([32, 3000])

In [22]:
out[0].shape[0]

3000

In [20]:
batch[1]

tensor([[[0.0000, 0.5000, 0.7023, 1.0000, 0.5953]],

        [[0.0000, 0.5356, 0.7307, 0.9234, 0.5386]],

        [[0.0000, 0.5249, 0.6539, 0.6240, 0.3719]],

        [[0.0000, 0.3938, 0.6087, 0.4149, 0.4758]],

        [[0.0000, 0.2991, 0.7930, 0.5982, 0.4141]],

        [[0.0000, 0.5900, 0.7293, 0.2819, 0.4563]],

        [[0.0000, 0.5638, 0.7178, 0.8673, 0.5628]],

        [[0.0000, 0.5133, 0.6580, 0.2966, 0.5003]],

        [[0.0000, 0.3506, 0.6865, 0.4622, 0.3761]],

        [[0.0000, 0.6062, 0.8726, 0.6983, 0.2548]],

        [[0.0000, 0.4531, 0.2672, 0.9062, 0.5344]],

        [[0.0000, 0.5960, 0.7137, 0.8079, 0.4078]],

        [[0.0000, 0.4684, 0.5808, 0.1414, 0.3399]],

        [[0.0000, 0.6586, 0.3367, 0.6609, 0.2516]],

        [[0.0000, 0.5693, 0.4363, 0.3457, 0.3337]],

        [[0.0000, 0.6564, 0.7758, 0.4034, 0.3078]],

        [[0.0000, 0.5058, 0.5058, 0.8862, 0.9711]],

        [[0.0000, 0.1162, 0.7102, 0.2324, 0.2703]],

        [[0.0000, 0.2525, 0.7891, 0.4514, 0.39

In [16]:
preds = yolo(dataset[0][0].to(device).unsqueeze(0))

In [18]:
from v8loss import v8DetectionLoss

criterion = v8DetectionLoss(yolo)

In [19]:
train(yolo, train_loader, valid_loader, test_loader, 1, criterion)

  0%|          | 0/32 [00:06<?, ?it/s]


RuntimeError: shape '[3000, 84, -1]' is invalid for input of size 3000

In [81]:
# define the object classification and bounding box regression model
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 2)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = torch.flatten(x, 1) # flatten all dimensions except batch
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

In [None]:
# define SSD model
class SSD(nn.Module):
    def __init__(self):
        super(SSD, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 2)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = torch.flatten(x, 1) # flatten all dimensions except batch
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

In [None]:
model = Net().to(device)