In [None]:
from torchvision import models
from torch import nn
import torch
import os
from PIL import Image
from torch.utils.data import Dataset,DataLoader
import json
import shutil
import torch.nn.functional as F
from torchvision import transforms
from torchsummary import summary
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
class CustomDataset(Dataset):
    def __init__(self, data_path,transform=None):
        classes = [
  "person",
  "rider",
  "car",
  "truck",
  "bus",
  "train",
  "motor",
  "bike",
  "traffic light",
  "traffic sign"
]
        self.transform = transform
        self.data=[]
        for file_name in os.listdir(data_path):
            img_path = data_path+"/"+file_name
            label_pth = img_path.replace("images","labels").replace(".jpg",".json")
            labels=[]
            with open(label_pth, "r") as label_file:
                label = json.load(label_file)
                objects = label["frames"][0]["objects"]
                for obj in objects:
                    if "box2d" in obj:
                        category = obj["category"]
                        category_num = classes.index(category)
                        box = obj["box2d"]
                        x1, y1 = box["x1"], box["y1"]
                        x2, y2 = box["x2"], box["y2"]
                        labels.append([category_num,x1,y1,x2,y2])
            self.data.append([img_path,labels])

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path, labels = self.data[idx]
        image = Image.open(img_path).convert("RGB")

        return image, labels

class Transform(Dataset):
    def __init__(self, base_dataset, transform):
        self.base_dataset = base_dataset
        self.transform = transform

    def __len__(self):
        return len(self.base_dataset)

    def __getitem__(self, idx):
        image, labels = self.base_dataset[idx]
        if self.transform:
            image = self.transform(image)
        return image, labels

def custom_collate_fn(batch):
    images = [item[0] for item in batch]
    labels = [item[1] for item in batch]

    images = torch.stack(images, dim=0)  # [B, C, H, W]
    return images, labels

In [None]:
data_path = "/content/drive/MyDrive/BDD100k/images"
train_path = data_path+"/train"
validation_path = data_path+"/val"
test_path = data_path+"/test"

image_size=256
batch_size=32

transform = transforms.Compose([
    transforms.Resize((image_size, image_size)),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

train_dataset = Transform(CustomDataset(train_path),transform)
test_dataset  = Transform(CustomDataset(test_path),transform)
val_dataset   = Transform(CustomDataset(validation_path),transform)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, pin_memory=True, num_workers=0,collate_fn=custom_collate_fn)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False,pin_memory=True, num_workers=0,collate_fn=custom_collate_fn)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, pin_memory=True, num_workers=0,collate_fn=custom_collate_fn)

In [None]:
class SpatialAttention(nn.Module):
    def __init__(self, kernel_size=7):
        super(SpatialAttention, self).__init__()

        padding = (kernel_size - 1) // 2
        self.conv1 = nn.Conv2d(2, 1, kernel_size, padding=padding, bias=False)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        avg_out = torch.mean(x, dim=1, keepdim=True)  # [B, 1, H, W]
        max_out, _ = torch.max(x, dim=1, keepdim=True)  # [B, 1, H, W]

        x_cat = torch.cat([avg_out, max_out], dim=1)  # [B, 2, H, W]

        x_out = self.conv1(x_cat)  # [B, 1, H, W]
        attention_map = self.sigmoid(x_out)

        return x * attention_map


class EncoderBackBone(nn.Module):
    def __init__(self):
        super(EncoderBackBone,self).__init__()
        efficient = models.efficientnet_b3()
        self.features = efficient.features
        self.SAttention=SpatialAttention()
    def forward(self,x,forPose): #Backbonedan çıkan veri BiFPN ve Pose hesaplaması için 2 ye ayrılacak BiFPN e 5-9.Bloklardan veri gidecek
        outs = []                #Pose hesaplaması için sadece son çıktı yeterli
        for i,block in enumerate(self.features):
            x = block(x)
            if i>2:
                x = self.SAttention(x)
            if i in [3, 5, 7] :
                outs.append(x) # Burdaki veriler BiFPN e gönderilecek
        if(forPose):
            return outs
        else :
            return x

In [None]:
class DepthwiseSeparableConv(nn.Module):
    """Depthwise Separable Convolution used in BiFPN"""
    def __init__(self, in_channels, out_channels, kernel_size=3, stride=1, padding=1):
        super(DepthwiseSeparableConv, self).__init__()
        self.depthwise = nn.Conv2d(in_channels, in_channels, kernel_size,
                                 stride, padding, groups=in_channels, bias=False)
        self.pointwise = nn.Conv2d(in_channels, out_channels, 1, bias=False)
        self.bn = nn.BatchNorm2d(out_channels)
        self.swish = nn.SiLU()  # Swish activation

    def forward(self, x):
        x = self.depthwise(x)
        x = self.pointwise(x)
        x = self.bn(x)
        return self.swish(x)

class BiFPNBlock(nn.Module):
    """Single BiFPN block with weighted feature fusion"""
    def __init__(self, channels, epsilon=1e-4):
        super(BiFPNBlock, self).__init__()
        self.epsilon = epsilon
        self.channels = channels

        # Convolution layers for each level
        self.conv_p3 = DepthwiseSeparableConv(channels, channels)
        self.conv_p4 = DepthwiseSeparableConv(channels, channels)
        self.conv_p5 = DepthwiseSeparableConv(channels, channels)
        self.conv_p6 = DepthwiseSeparableConv(channels, channels)
        self.conv_p7 = DepthwiseSeparableConv(channels, channels)

        # Weight parameters for feature fusion
        # P6_td weights
        self.w1 = nn.Parameter(torch.ones(2))
        # P5_td weights
        self.w2 = nn.Parameter(torch.ones(2))
        # P4_td weights
        self.w3 = nn.Parameter(torch.ones(2))
        # P3_out weights
        self.w4 = nn.Parameter(torch.ones(2))
        # P4_out weights
        self.w5 = nn.Parameter(torch.ones(3))
        # P5_out weights
        self.w6 = nn.Parameter(torch.ones(3))
        # P6_out weights
        self.w7 = nn.Parameter(torch.ones(3))
        # P7_out weights
        self.w8 = nn.Parameter(torch.ones(2))

    def forward(self, inputs):
        """
        inputs: [P3, P4, P5, P6, P7] feature maps
        """
        P3, P4, P5, P6, P7 = inputs

        # Top-down pathway
        # P6_td = (w1[0] * P6 + w1[1] * resize(P7)) / (w1[0] + w1[1] + eps)
        w1 = F.relu(self.w1)
        P6_td = (w1[0] * P6 + w1[1] * self.up_sampling(P7, P6.shape[-2:])) / (w1.sum() + self.epsilon)
        P6_td = self.conv_p6(P6_td)

        # P5_td = (w2[0] * P5 + w2[1] * resize(P6_td)) / (w2[0] + w2[1] + eps)
        w2 = F.relu(self.w2)
        P5_td = (w2[0] * P5 + w2[1] * self.up_sampling(P6_td, P5.shape[-2:])) / (w2.sum() + self.epsilon)
        P5_td = self.conv_p5(P5_td)

        # P4_td = (w3[0] * P4 + w3[1] * resize(P5_td)) / (w3[0] + w3[1] + eps)
        w3 = F.relu(self.w3)
        P4_td = (w3[0] * P4 + w3[1] * self.up_sampling(P5_td, P4.shape[-2:])) / (w3.sum() + self.epsilon)
        P4_td = self.conv_p4(P4_td)

        # Bottom-up pathway
        # P3_out = (w4[0] * P3 + w4[1] * resize(P4_td)) / (w4[0] + w4[1] + eps)
        w4 = F.relu(self.w4)
        P3_out = (w4[0] * P3 + w4[1] * self.up_sampling(P4_td, P3.shape[-2:])) / (w4.sum() + self.epsilon)
        P3_out = self.conv_p3(P3_out)

        # P4_out = (w5[0] * P4 + w5[1] * P4_td + w5[2] * resize(P3_out)) / (w5[0] + w5[1] + w5[2] + eps)
        w5 = F.relu(self.w5)
        P4_out = (w5[0] * P4 + w5[1] * P4_td + w5[2] * self.down_sampling(P3_out, P4.shape[-2:])) / (w5.sum() + self.epsilon)
        P4_out = self.conv_p4(P4_out)

        # P5_out = (w6[0] * P5 + w6[1] * P5_td + w6[2] * resize(P4_out)) / (w6[0] + w6[1] + w6[2] + eps)
        w6 = F.relu(self.w6)
        P5_out = (w6[0] * P5 + w6[1] * P5_td + w6[2] * self.down_sampling(P4_out, P5.shape[-2:])) / (w6.sum() + self.epsilon)
        P5_out = self.conv_p5(P5_out)

        # P6_out = (w7[0] * P6 + w7[1] * P6_td + w7[2] * resize(P5_out)) / (w7[0] + w7[1] + w7[2] + eps)
        w7 = F.relu(self.w7)
        P6_out = (w7[0] * P6 + w7[1] * P6_td + w7[2] * self.down_sampling(P5_out, P6.shape[-2:])) / (w7.sum() + self.epsilon)
        P6_out = self.conv_p6(P6_out)

        # P7_out = (w8[0] * P7 + w8[1] * resize(P6_out)) / (w8[0] + w8[1] + eps)
        w8 = F.relu(self.w8)
        P7_out = (w8[0] * P7 + w8[1] * self.down_sampling(P6_out, P7.shape[-2:])) / (w8.sum() + self.epsilon)
        P7_out = self.conv_p7(P7_out)

        return [P3_out, P4_out, P5_out, P6_out, P7_out]

    def up_sampling(self, x, target_size):
        """Upsampling with nearest neighbor interpolation"""
        return F.interpolate(x, size=target_size, mode='nearest')

    def down_sampling(self, x, target_size):
        """Downsampling with max pooling"""
        if x.shape[-2:] == target_size:
            return x

        # Calculate stride for max pooling
        stride = x.shape[-1] // target_size[-1]
        kernel_size = stride

        return F.max_pool2d(x, kernel_size=kernel_size, stride=stride)

class BiFPN(nn.Module):
    """Complete BiFPN module with multiple blocks"""
    def __init__(self, in_channels_list, out_channels=256, num_blocks=3):
        super(BiFPN, self).__init__()
        self.out_channels = out_channels
        self.num_blocks = num_blocks

        # Input projection layers to match channel dimensions
        self.input_convs = nn.ModuleList([
            nn.Conv2d(in_ch, out_channels, 1, bias=False)
            for in_ch in in_channels_list
        ])

        # Additional P6 and P7 layers
        self.p6_conv = nn.Conv2d(in_channels_list[-1], out_channels, 3, stride=2, padding=1)
        self.p7_conv = nn.Conv2d(out_channels, out_channels, 3, stride=2, padding=1)

        # BiFPN blocks
        self.bifpn_blocks = nn.ModuleList([
            BiFPNBlock(out_channels) for _ in range(num_blocks)
        ])

    def forward(self, inputs):
        """
        inputs: List of feature maps from backbone [C3, C4, C5]
        returns: List of enhanced feature maps [P3, P4, P5, P6, P7]
        """
        # Project input features to same channel dimension
        features = []
        for i, feat in enumerate(inputs):
            features.append(self.input_convs[i](feat))

        # Create P6 and P7
        P6 = self.p6_conv(inputs[-1])  # P6 from C5
        P7 = self.p7_conv(P6)          # P7 from P6

        # Initial feature list [P3, P4, P5, P6, P7]
        pyramid_features = features + [P6, P7]

        # Apply BiFPN blocks
        for block in self.bifpn_blocks:
            pyramid_features = block(pyramid_features)

        return pyramid_features

In [None]:
efficient_backbone = EncoderBackBone()
biFPN = BiFPN([48,136,384],256,3)
pred_labels=[]
for i, data in enumerate(val_loader):
    images,labels=data
    x = efficient_backbone(images,forPose=True)
    out = biFPN(x)
    pred_labels.append(out)

