In [1]:
# -------------------------------------------------------------
# Import Required Libraries
# -------------------------------------------------------------
import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
from torchvision.models import efficientnet_b0, EfficientNet_B0_Weights
from PIL import Image
from tqdm import tqdm

In [2]:
# -------------------------------------------------------------
# DEFT Modules
# -------------------------------------------------------------
class LocalizationNetwork(nn.Module):
    def __init__(self, input_channels):
        super(LocalizationNetwork, self).__init__()
        self.conv1 = nn.Conv2d(input_channels, 8, kernel_size=7)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(8, 10, kernel_size=5)
        self.fc1 = nn.Linear(1, 32)  # placeholder
        self.fc2 = nn.Linear(32, 6)

        self.fc2.weight.data.zero_()
        self.fc2.bias.data.copy_(torch.tensor([1, 0, 0, 0, 1, 0], dtype=torch.float))

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))

        if not hasattr(self, 'computed_fc1'):
            flattened_size = x.view(x.shape[0], -1).shape[1]
            self.fc1 = nn.Linear(flattened_size, 32).to(x.device)
            self.computed_fc1 = True

        x = x.view(x.shape[0], -1)
        x = F.relu(self.fc1(x))
        theta = self.fc2(x)
        theta = theta.view(-1, 2, 3)
        return theta


class WeightingModule(nn.Module):
    def __init__(self, sigma=0.5):
        super(WeightingModule, self).__init__()
        self.lambda_param = nn.Parameter(torch.tensor(0.5))
        self.sigma = sigma

    def forward(self, grid):
        dist2 = grid[..., 0] ** 2 + grid[..., 1] ** 2
        weight = 1 + self.lambda_param * torch.exp(-dist2 / (2 * self.sigma ** 2))
        return weight.unsqueeze(-1)


class DEFTModule(nn.Module):
    def __init__(self, input_channels, sigma=0.5):
        super(DEFTModule, self).__init__()
        self.localization = LocalizationNetwork(input_channels)
        self.weighting = WeightingModule(sigma)

    def forward(self, x):
        theta = self.localization(x)
        grid = F.affine_grid(theta, x.size(), align_corners=False)
        weight = self.weighting(grid)
        x_transformed = F.grid_sample(x, grid, align_corners=False)

        if x.shape[1] > 1:
            weight = weight.expand(-1, x.shape[2], x.shape[3], x.shape[1]).permute(0, 3, 1, 2)
        else:
            weight = weight.permute(0, 3, 1, 2)

        x_weighted = x_transformed * weight
        return x_weighted



In [3]:
# -------------------------------------------------------------
# EfficientNet Feature Extractor with DEFT
# -------------------------------------------------------------
class FeatureExtractor(nn.Module):
    def __init__(self, use_deft=True):
        super(FeatureExtractor, self).__init__()
        self.use_deft = use_deft
        self.deft = DEFTModule(input_channels=3) if use_deft else None

        # Load EfficientNet-B0
        weights = EfficientNet_B0_Weights.DEFAULT
        self.backbone = efficientnet_b0(weights=weights)

        # Remove classifier head
        self.backbone = nn.Sequential(
            self.backbone.features,
            nn.AdaptiveAvgPool2d(1)
        )

    def forward(self, x):
        if self.use_deft:
            x = self.deft(x)
        x = self.backbone(x)
        return x.view(x.size(0), -1)

# -------------------------------------------------------------
# Initialize Model and Device
# -------------------------------------------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
feature_extractor = FeatureExtractor(use_deft=True).to(device)
feature_extractor.eval()

# -------------------------------------------------------------
# Image Preprocessing
# -------------------------------------------------------------
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])


def extract_features(image_path, transform, device=device):
    try:
        image = Image.open(image_path).convert('RGB')
        image = transform(image).unsqueeze(0).to(device)

        with torch.no_grad():
            features = feature_extractor(image).squeeze().cpu().numpy()

        return features
    except Exception as e:
        print(f"Skipping frame: {image_path} due to error: {e}")
        return None


In [9]:
# -------------------------------------------------------------
# Paths
# -------------------------------------------------------------
rgb_path = "D:/Datasets/Datasets/EPIC_Kitchen/RGB/P01_04"
label_csv_path = "D:/Datasets/Datasets/EPIC_Kitchen/Label/P01_04.csv"
output_csv = "Feature_P01_04_EpicKitchen_EfficientNetLW.csv"

labels_df = pd.read_csv(label_csv_path)


In [10]:
# -------------------------------------------------------------
# Extract RGB Features Only and Save
# -------------------------------------------------------------
S = 10
features_list = []
all_frames_rgb = sorted(os.listdir(rgb_path))[::S]

for frame in tqdm(all_frames_rgb, desc="Extracting RGB Features Only"):
    frame_path = os.path.join(rgb_path, frame)
    rgb_features = extract_features(frame_path, transform)

    if rgb_features is not None:
        frame_number = int(frame.split('_')[-1].split('.')[0])
        label_row = labels_df[(labels_df['StartFrame'] <= frame_number) & (labels_df['EndFrame'] >= frame_number)]

        if not label_row.empty:
            action_class = label_row.iloc[0]['Action_class']
        else:
            action_class = 0  # default fallback

        features_list.append([frame, action_class] + rgb_features.tolist())

# ----------------- Save to CSV -----------------
if len(features_list) == 0:
    raise ValueError("No valid features extracted. Check paths and formats.")

columns = ["Frame", "Action_class"] + [f"Feature_{i}" for i in range(len(rgb_features))]
df = pd.DataFrame(features_list, columns=columns)

output_dir = os.path.dirname(output_csv)
if output_dir:
    os.makedirs(output_dir, exist_ok=True)

df.to_csv(output_csv, index=False)
print(f"\n✅ Feature extraction completed. Saved to: {output_csv}")


Extracting RGB Features Only: 100%|██████████████████████████████████████████████████| 631/631 [00:25<00:00, 24.38it/s]



✅ Feature extraction completed. Saved to: Feature_P01_04_EpicKitchen_EfficientNetLW.csv
