In [1]:
from google.colab import drive
drive.mount('/content/drive')
import os
# List the contents of your shared drives
print(os.listdir('/content/drive/MyDrive'))

Mounted at /content/drive
['Getting started.pdf', 'My account info.zip', 'final revision.gdoc', 'Untitled spreadsheet.gsheet', 'Drivez', 'المقرر حتى 15-3.docx 3rd Sec..pdf', 'Untitled presentation.gslides', 'dynamic  (Revision).pdf', 'Untitled project.gscript', 'Cpp All in One for Dummies - FreePdf-Books.com.pdf', 'Untitled Jam.gjam', 'Abdelrahman.pdf', 'Cutting tool materials and economics of cutting processes.gdoc', 'حق الجنسية (1).docx', 'حق الجنسية.docx', 'Reading Skills.pdf', 'Ohms Law verification (1).pdf', 'Lab Safety.pdf', 'Ohms Law verification.pdf', 'Kirchhof.pdf', 'Superposition and thelenin.pdf', 'circuits 2101645.pdf', 'Writting skills.pdf', 'Section 8', 'AC 1.pdf', 'Notes', 'AC 2.pdf', 'Math 2 (Fall 2021 3rd seamster) ', 'circuits.pdf', 'Lec 07 - Civil Engineering.pdf', 'Lec 08 - Mechanical Engineering.pdf', 'Lec 09 - Electrical Engineering.pdf', '[Witanime.com] JK EP 02 BD-FHD.mp4', 'I Want to thank you for every moment I spend with yo1.pdf', 'I Want to thank you for eve

In [2]:
import zipfile

zip_path = '/content/drive/MyDrive/full_data.zip'
extract_path = '/content/full_data/'

os.makedirs(extract_path, exist_ok=True)

with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_path)

print("Extraction complete.")

OSError: [Errno 28] No space left on device

In [None]:
import zipfile

zip_path = '/content/drive/MyDrive/val_data.zip'
extract_path = '/content/full_data/validation'

os.makedirs(extract_path, exist_ok=True)

with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_path)

print("Extraction complete.")

In [4]:
import os
import torch
from torch.utils.data import Dataset
from PIL import Image

class GOT10KDataset(Dataset):
    def __init__(self, root_dir, transform=None, output_size=(13, 13)):
        self.root_dir = root_dir
        self.transform = transform
        self.output_size = output_size
        self.samples = self._load_samples()

    def _load_samples(self):
        samples = []
        for folder in os.listdir(self.root_dir):
            folder_path = os.path.join(self.root_dir, folder)
            if os.path.isdir(folder_path):
                frame_paths = sorted([
                    os.path.join(folder_path, img)
                    for img in os.listdir(folder_path)
                    if img.endswith(('.jpg', '.png'))
                ])

                # Ensure we have enough frames to form a sequence
                if len(frame_paths) >= 2:
                    for i in range(len(frame_paths) - 1):
                        template_path = frame_paths[i]
                        search_path = frame_paths[i + 1]

                        # Example classification label with the required output size (H, W)
                        cls_label = torch.zeros(2, *self.output_size)  # Assuming 2 classes and binary label maps
                        cls_label[0, :, :] = 1  # Set the first channel to ones (example)

                        reg_label = torch.tensor([0.5, 0.5, 0.5, 0.5])  # Example regression label
                        samples.append((template_path, search_path, cls_label, reg_label))
        return samples

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        template_path, search_path, cls_label, reg_label = self.samples[idx]
        template = Image.open(template_path).convert('RGB')
        search = Image.open(search_path).convert('RGB')

        if self.transform:
            template = self.transform(template)
            search = self.transform(search)

        return template, search, cls_label, reg_label

from torchvision import transforms

data_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

dataset = GOT10KDataset(root_dir='/content/full_data/train', transform=data_transform)



In [5]:
from torch.utils.data import DataLoader, random_split

batch_size = 1
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size

train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)


In [6]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import Conv2d, Linear, Dropout, MultiheadAttention, GroupNorm

class FeatureExtractor(nn.Module):
    def __init__(self):
        super(FeatureExtractor, self).__init__()
        alexnet = torch.hub.load('pytorch/vision:v0.10.0', 'alexnet', pretrained=True)
        self.features = nn.Sequential(*list(alexnet.features.children())[:-1])

    def forward(self, x):
        outputs = []
        for layer in self.features:
            x = layer(x)
            outputs.append(x)
        return outputs[-3:]

class FeatureEncoder(nn.Module):
    def __init__(self, in_channels, out_channels, num_heads=8):
        super(FeatureEncoder, self).__init__()
        self.conv = Conv2d(in_channels, out_channels, kernel_size=1)
        self.positional_encoding = nn.Parameter(torch.randn(1, out_channels, 1, 1))
        self.multihead_attn = MultiheadAttention(embed_dim=out_channels, num_heads=num_heads)
        self.norm = GroupNorm(8, out_channels)

    def forward(self, x1, x2):
        x1 = self.conv(x1)
        x2 = self.conv(x2)
        x1 += self.positional_encoding
        x2 += self.positional_encoding

        B, C, H, W = x1.size()
        x1_flat = x1.view(B, C, -1).permute(2, 0, 1)
        x2_flat = x2.view(B, C, -1).permute(2, 0, 1)

        attn_output, _ = self.multihead_attn(x1_flat, x2_flat, x2_flat)
        attn_output = attn_output.permute(1, 2, 0).view(B, C, H, W)

        output = self.norm(x1 + attn_output)
        return output

class FeatureDecoder(nn.Module):
    def __init__(self, in_channels, num_heads=8):
        super(FeatureDecoder, self).__init__()
        self.multihead_attn = MultiheadAttention(embed_dim=in_channels, num_heads=num_heads)
        self.norm1 = GroupNorm(8, in_channels)
        self.norm2 = GroupNorm(8, in_channels)
        self.ffn = nn.Sequential(
            Linear(in_channels, in_channels * 4),
            nn.ReLU(),
            Linear(in_channels * 4, in_channels),
        )
        self.dropout = Dropout(0.1)

    def forward(self, x):
        B, C, H, W = x.size()
        x_flat = x.view(B, C, -1).permute(2, 0, 1)

        attn_output, _ = self.multihead_attn(x_flat, x_flat, x_flat)
        attn_output = attn_output.permute(1, 2, 0).view(B, C, H, W)

        x = self.norm1(x + attn_output)
        x = x + self.ffn(x.view(B, C, -1).permute(2, 0, 1)).permute(1, 2, 0).view(B, C, H, W)
        x = self.norm2(x)

        return x

class ClassificationAndRegression(nn.Module):
    def __init__(self, in_channels):
        super(ClassificationAndRegression, self).__init__()
        self.cls_conv = nn.Conv2d(in_channels, 2, kernel_size=1)
        self.reg_conv = nn.Conv2d(in_channels, 4, kernel_size=1)

    def forward(self, x):
        cls_output = self.cls_conv(x)
        reg_output = self.reg_conv(x)
        return cls_output, reg_output

class ModulationLayer(nn.Module):
    def __init__(self, in_channels, reduction_ratio=16):
        super(ModulationLayer, self).__init__()
        self.gap = nn.AdaptiveAvgPool2d(1)
        self.fc1 = nn.Linear(in_channels, in_channels // reduction_ratio, bias=False)
        self.relu = nn.ReLU(inplace=True)
        self.fc2 = nn.Linear(in_channels // reduction_ratio, in_channels, bias=False)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        b, c, _, _ = x.size()
        y = self.gap(x).view(b, c)
        y = self.relu(self.fc1(y))
        y = self.sigmoid(self.fc2(y)).view(b, c, 1, 1)
        return x * y.expand_as(x)

class HiFT(nn.Module):
    def __init__(self):
        super(HiFT, self).__init__()
        self.feature_extractor = FeatureExtractor()
        self.feature_encoder = FeatureEncoder(in_channels=256, out_channels=256)
        self.modulation_layer = ModulationLayer(in_channels=256)
        self.feature_decoder = FeatureDecoder(in_channels=256)
        self.classification_and_regression = ClassificationAndRegression(in_channels=256)
        self.concat_conv = nn.Conv2d(256 * 2, 256, kernel_size=1)

    def concatenate_and_conv(self, z, x):
        concatenated = torch.cat((z, x), dim=1)
        fused = self.concat_conv(concatenated)
        return fused

    def forward(self, z, x):
        z_features = self.feature_extractor(z)
        x_features = self.feature_extractor(x)

        encoded_features = []
        for i in range(3):
            encoded = self.feature_encoder(z_features[i], x_features[i])
            modulated = self.modulation_layer(encoded)
            fused_features = self.concatenate_and_conv(z_features[i], x_features[i])
            decoded = self.feature_decoder(fused_features)
            encoded_features.append(decoded)

        final_features = sum(encoded_features)
        cls_output, reg_output = self.classification_and_regression(final_features)

        return cls_output, reg_output


In [8]:
import torch
import torch.optim as optim
import torch.nn as nn

def train(model, dataloader, epochs, device):
    model.train()
    criterion_cls = nn.CrossEntropyLoss()
    criterion_reg = nn.MSELoss()
    optimizer = optim.SGD(model.parameters(), lr=0.001)

    for epoch in range(epochs):
        running_loss = 0.0
        for i, (template, search, cls_label, reg_label) in enumerate(dataloader):
            try:
              template, search, cls_label, reg_label = template.to(device), search.to(device), cls_label.to(device), reg_label.to(device)

              optimizer.zero_grad()

              cls_output, reg_output = model(template, search)

              # Ensure cls_label has the correct shape (e.g., [batch_size, num_classes, H, W])
              cls_loss = criterion_cls(cls_output, cls_label)

              # Reshape reg_label to match reg_output
              reg_label = reg_label.unsqueeze(-1).unsqueeze(-1).expand(-1, -1, 13, 13)  # Expand dimensions to match reg_output
              reg_loss = criterion_reg(reg_output, reg_label)

              loss = cls_loss + reg_loss

              loss.backward()
              optimizer.step()

              running_loss += loss.item()

              if i % 10 == 9:
                  print(f"[{epoch + 1}, {i + 1}] loss: {running_loss / 10:.3f}")
                  running_loss = 0.0
                  # early break, the dataset is too big
                  if i >= 9999:
                    break
            except:
              break

    print('Finished Training')

# Define device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Instantiate model and move to device
model = HiFT().to(device)

# Instantiate dataset and dataloader

# Train the model for 10 epochs
train(model, train_dataloader, epochs=1, device=device)

Using cache found in /root/.cache/torch/hub/pytorch_vision_v0.10.0


[1, 10] loss: 1.542
[1, 20] loss: 0.639
[1, 30] loss: 0.457
[1, 40] loss: 0.321
[1, 50] loss: 0.291
[1, 60] loss: 0.279
[1, 70] loss: 0.183
[1, 80] loss: 0.159
[1, 90] loss: 0.134
[1, 100] loss: 0.148
[1, 110] loss: 0.114
[1, 120] loss: 0.117
[1, 130] loss: 0.103
[1, 140] loss: 0.093
[1, 150] loss: 0.088
[1, 160] loss: 0.064
[1, 170] loss: 0.073
[1, 180] loss: 0.075
[1, 190] loss: 0.058
[1, 200] loss: 0.056
[1, 210] loss: 0.057
[1, 220] loss: 0.048
[1, 230] loss: 0.035
[1, 240] loss: 0.044
[1, 250] loss: 0.046
[1, 260] loss: 0.042
[1, 270] loss: 0.046
[1, 280] loss: 0.046
[1, 290] loss: 0.051
[1, 300] loss: 0.051
[1, 310] loss: 0.051
[1, 320] loss: 0.047
[1, 330] loss: 0.032
[1, 340] loss: 0.031
[1, 350] loss: 0.032
[1, 360] loss: 0.038
[1, 370] loss: 0.031
[1, 380] loss: 0.041
[1, 390] loss: 0.032
[1, 400] loss: 0.025
[1, 410] loss: 0.038
[1, 420] loss: 0.033
[1, 430] loss: 0.025
[1, 440] loss: 0.021
[1, 450] loss: 0.026
[1, 460] loss: 0.018
[1, 470] loss: 0.028
[1, 480] loss: 0.022
[

In [12]:
def validate(model, dataloader, device):
    model.eval()
    total_cls_loss = 0.0
    total_reg_loss = 0.0
    criterion_cls = nn.CrossEntropyLoss()
    criterion_reg = nn.MSELoss()

    for i, (template, search, cls_label, reg_label) in enumerate(dataloader):
          try:
            template, search, cls_label, reg_label = (
                template.to(device),
                search.to(device),
                cls_label.to(device),
                reg_label.to(device),
            )
            cls_output, reg_output = model(template, search)

            # For CrossEntropyLoss, cls_label should be of shape [batch_size, H, W] if cls_output is [batch_size, num_classes, H, W]
            cls_label = cls_label.squeeze(-1).squeeze(-1)  # Remove extra dimensions to match output shape requirements

            # Compute classification loss
            cls_loss = criterion_cls(cls_output, cls_label)

            # Adjust reg_label to match the shape of reg_output
            reg_label = reg_label.unsqueeze(-1).unsqueeze(-1)  # Add two dimensions: [batch_size, 4, 1, 1]
            reg_label = reg_label.expand_as(reg_output)  # Expand to match reg_output shape

            # Compute regression loss
            reg_loss = criterion_reg(reg_output, reg_label)

            # Accumulate total loss
            total_cls_loss += cls_loss.item()
            total_reg_loss += reg_loss.item()
          except:
            break

    avg_cls_loss = total_cls_loss / i
    avg_reg_loss = total_reg_loss / i
    avg_total_loss = avg_cls_loss + avg_reg_loss

    print(f"Validation Classification Loss: {avg_cls_loss:.3f}")
    print(f"Validation Regression Loss: {avg_reg_loss:.3f}")
    print(f"Validation Total Loss: {avg_total_loss:.3f}")

# Validate the model
validate(model, val_dataloader, device)


Validation Classification Loss: 0.000
Validation Regression Loss: 0.005
Validation Total Loss: 0.005
