In [None]:
%load_ext autoreload
%autoreload 2
import matplotlib.pyplot as plt
import torch
from torchvision import models, transforms
from PIL import Image
import torch.nn as nn
import pandas as pd
import os
import numpy as np
import glob


In [None]:
class ResizeToMaxDim:
    def __init__(self, max_size):
        self.max_size = max_size
    
    def __call__(self, img):
        # Get current size
        w, h = img.size
        if w > h:
            new_w = self.max_size
            new_h = int(h * (self.max_size / w))
        else:
            new_h = self.max_size
            new_w = int(w * (self.max_size / h))
        return img.resize((new_w, new_h), Image.LANCZOS)
    
class PadToSquare:
    def __init__(self, size, fill=0):
        self.size = size
        self.fill = fill
    
    def __call__(self, img):
        w, h = img.size
        pad_w = (self.size - w) // 2
        pad_h = (self.size - h) // 2
        padding = (pad_w, pad_h, self.size - w - pad_w, self.size - h - pad_h)
        return transforms.functional.pad(img, padding, fill=self.fill)


def plot_image_with_age(model, image_path, csv_data = None, classification = False, image_size = 448):
    # Define transformation

    
    # transform = transforms.Compose([
    #     transforms.Resize(448),
    #     #transforms.CenterCrop(600),
    #     transforms.ToTensor(),
    #     transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    # ])

    transform = transforms.Compose([
            ResizeToMaxDim(image_size),
            PadToSquare(image_size),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ])

    # Load and preprocess the image
    if isinstance(image_path, str):
        image = Image.open(image_path).convert('RGB')
    else:
        image = Image.fromarray(image_path)


    input_image_before_cuda = transform(image)
    # plt.imshow(input_image_before_cuda.permute(1, 2, 0))
    # plt.show()
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    input_image = input_image_before_cuda.unsqueeze(0).to(device)

    # Get the true age from the CSVnot so clean writtenm since lag_benchmark_cropped is not the best way to make the comparison slower ways I have seen, but couldn come up with similar fast
    if csv_data is not None:
        true_age = csv_data[csv_data['img_name'] == os.path.join("lag_benchmark_cropped",os.path.basename(image_path))]['age'].values[0]
    else:
        true_age = -1

    # Make prediction
    # plt.imshow(input_image_before_cuda.numpy().transpose(1, 2, 0))
    # plt.show()
    with torch.no_grad():
        output = model(input_image)
        if not classification:
            predicted_age = output.item()
        else:
            predicted_age = output
       
    # Plot the image with the predicted and true age
    plt.figure(figsize=(8, 8))
    plt.imshow(image)

    if classification:
        predicted_age_group = nn.Softmax(dim=1)(output)
        # predicted_age_group = np.argmax(output.cpu().numpy())
        # age_group_mapping = {
        #     0: '0-2',
        #     1: '3-13',
        #     2: '18-99',
        # }
        # predicted_age_group = age_group_mapping.get(predicted_age_group, 'Unknown')
        plt.title(f'Estimated Age: {predicted_age_group}\nTrue Age: {true_age}')

    else:
        plt.title(f'Estimated Age: {predicted_age:.2f}\nTrue Age: {true_age}')
    plt.axis('off')
    plt.show()

## Age Regression whole Body

In [None]:
# Load model weights

model_weights_path = '/usr/users/vhassle/psych_track/AgeSelf/models/age_classification_model_25.pth'
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')  
model_age = models.resnet50(pretrained=False)
num_ftrs = model_age.fc.in_features
model_age.fc = nn.Linear(num_ftrs, 1)
model_age.load_state_dict(torch.load(model_weights_path))
model = model_age.to(device)
model.eval()

# Load true ages from CSV
data = pd.read_csv('/usr/users/vhassle/datasets/lagenda/cropped_data.csv')

In [None]:
import glob
#image_paths = glob.glob("/usr/users/vhassle/datasets/lagenda/lag_benchmark_cropped/*")
data_subset = data[data['age'] != 100]
image_paths = [os.path.join('/usr/users/vhassle/datasets/lagenda', image_name) for image_name in data_subset['img_name']]

print(len(image_paths))
for image_path in image_paths[2134:2135]:
    plot_image_with_age(model_age, image_path, data)

## Age classification whole Body

In [None]:
# Load model weights
# model_weights_path = '/usr/users/vhassle/datasets/lagenda/age_classification_model.pth'
model_weights_path = '/usr/users/vhassle/psych_track/AgeSelf/models/age_classification_model_15_focal_pad.pth'
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')  
model_age = models.resnet50(pretrained=False)
num_ftrs = model_age.fc.in_features
model_age.fc = nn.Linear(num_ftrs, 3)
model_age.load_state_dict(torch.load(model_weights_path))
model = model_age.to(device)
model.eval()

# Load true ages from CSV
data_class = pd.read_csv('/usr/users/vhassle/datasets/lagenda/cropped_data_age_classes.csv')

In [None]:
data_subset = data[data['age'] != 100]
image_paths = [os.path.join('/usr/users/vhassle/datasets/lagenda', image_name) for image_name in data_subset['img_name']]

print(len(image_paths))
for image_path in image_paths[0:100:100]:
    plot_image_with_age(model_age, image_path, data, classification=True)

In [None]:

#image_paths = glob.glob("/usr/users/vhassle/psych_track/Gender-and-Age-Detection_Face/*.jpg")
image_paths = glob.glob("/usr/users/vhassle/datasets/Wortschatzinsel/cropped_images/*.jpg")
for image_path in image_paths:
    plot_image_with_age(model_age, image_path, None, classification=True)

## Für die Gesichter

In [None]:

# Load model weights
# model_weights_path = '/usr/users/vhassle/datasets/lagenda/age_classification_model.pth'
model_weights_path = '/usr/users/vhassle/psych_track/AgeSelf/models/age_classification_model_15_focal_pad.pth'
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')  
model_age = models.resnet50(pretrained=False)
num_ftrs = model_age.fc.in_features
model_age.fc = nn.Linear(num_ftrs, 3)
model_age.load_state_dict(torch.load(model_weights_path))
model_age = model_age.to(device)
model_age.eval()
# Load true ages from CSV
data_class = pd.read_csv('/usr/users/vhassle/datasets/lagenda/cropped_data_age_classes.csv')

In [None]:

#image_paths = glob.glob("/usr/users/vhassle/psych_track/Gender-and-Age-Detection_Face/*.jpg")
#image_paths = glob.glob("/usr/users/vhassle/datasets/Wortschatzinsel/cropped_images/*.jpg")
image_paths = glob.glob("/usr/users/vhassle/datasets/example_images/children/*")
for image_path in image_paths[0:1]:
    plot_image_with_age(model_age, image_path, None, classification=True, image_size=150)

## combining face detection with age classification in a video

In [None]:
from torch.utils.data import Dataset
import decord
class VideoDataset(Dataset):
    def __init__(self, video_path):
        # Initialize the VideoReader
        self.vr = decord.VideoReader(video_path, ctx=decord.cpu(0))  # Load video in CPU memory
        self.length = len(self.vr)  # Total number of frames

    def __getitem__(self, idx):
        if idx < 0 or idx >= self.length:
            raise IndexError("Index out of bounds")
        frame = self.vr[idx].asnumpy()
        return frame #output: top_view, coming_in_view, going_out_view

    def __len__(self):
        return self.length

video_1 = VideoDataset("/usr/users/vhassle/datasets/Wortschatzinsel/Neon_complete/Neon/2024-05-04-11-30-31/2024_05_04_11_30_31.mp4")


In [None]:
frame = video_1[5000]


frame = Image.open("/usr/users/vhassle/datasets/example_images/children/1000_F_140125186_ktTYm8WJ8EDK7Fc82g9A9OU3OrTa5cyg.jpg")
frame = np.asarray(frame)
plt.imshow(frame)
plt.show()
from pytorch_retinaface.detect import process_image, load_Retinanet
model_age = load_Retinanet("/usr/users/vhassle/psych_track/Pytorch_Retinaface/Resnet50_Final.pth")
faces = process_image(model, frame)


### Function that does the stuff on its own

# V1 works 4.5 img/s on phobos right now 1 img/s on deimos

In [None]:
%load_ext autoreload
%autoreload 2
import cv2
import torch
import torchvision.transforms as transforms
import numpy as np
from PIL import Image
from tqdm import tqdm
from torch import nn
from torch.utils.data import Dataset, DataLoader
import decord
from torchvision import models, transforms
from pytorch_retinaface.detect import process_image, load_Retinanet #self created module self installed
import os 
os.environ["CUDA_VISIBLE_DEVICES"] = "7"

In [None]:
class ResizeToMaxDim:
    def __init__(self, max_size):
        self.max_size = max_size
    
    def __call__(self, img):
        # Get current size
        w, h = img.size
        if w > h:
            new_w = self.max_size
            new_h = int(h * (self.max_size / w))
        else:
            new_h = self.max_size
            new_w = int(w * (self.max_size / h))
        return img.resize((new_w, new_h), Image.LANCZOS)
    
class PadToSquare:
    def __init__(self, size, fill=0):
        self.size = size
        self.fill = fill
    
    def __call__(self, img):
        w, h = img.size
        pad_w = (self.size - w) // 2
        pad_h = (self.size - h) // 2
        padding = (pad_w, pad_h, self.size - w - pad_w, self.size - h - pad_h)
        return transforms.functional.pad(img, padding, fill=self.fill)


def plot_image_with_estimated_ages(image, model_age, model_face_detection ,index_frame = 0, classification=False, image_size=448):
    """
    Plots an image with estimated ages annotated on detected faces.

    Parameters:
    image (Union[str, np.ndarray]): Path to the image or a NumPy array representing the image.
    model: The model used for age estimation.
    classification (bool): If True, classify the age group. If False, predict the exact age.
    image_size (int): The size to which the image should be resized.
    """
    # Define transformation
    transform = transforms.Compose([
        ResizeToMaxDim(image_size),
        PadToSquare(image_size),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])

    # Load the image
    if isinstance(image, str):
        frame = cv2.imread(image)
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    elif isinstance(image, np.ndarray):
        frame_rgb = image
        frame = cv2.cvtColor(frame_rgb, cv2.COLOR_RGB2BGR)
    else:
        raise ValueError("image must be a file path or a numpy array")

    # Detect faces
    faces = process_image(model_face_detection, frame_rgb)
    height, width, _ = frame.shape

    # Prepare for prediction
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    annotations = []

    for face_key in faces.keys():
        face_area = faces[face_key]
        x1, y1, x2, y2 = face_area
        x1, y1, x2, y2 = max(0, x1), max(0, y1), min(width, x2), min(height, y2)

        # Extract the face
        face = frame_rgb[y1:y2, x1:x2]

        # Convert the face to PIL image and apply transformation
        face_pil = Image.fromarray(face)
        input_image_before_cuda = transform(face_pil)
        input_image = input_image_before_cuda.unsqueeze(0).to(device)

        # Make prediction
        with torch.no_grad():
            output = model_age(input_image)
            if not classification:
                predicted_age = output.item()
                predicted_age_text = f'Age: {predicted_age:.2f}'
            else:
                predicted_age_group = nn.Softmax(dim=1)(output)
                age_group = predicted_age_group.argmax(dim=1).item()
                predicted_age_text = f'{age_group}'

        # Annotate the image using OpenCV
        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
        cv2.putText(frame, predicted_age_text, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)

        # Prepare annotation for MOT format
        annotation = [index_frame, face_key, x1, y1, x2 - x1, y2 - y1, 1, -1, -1, -1, age_group]
        annotations.append(annotation)

    return frame, annotations

# Adapted DataLoader
class VideoDataset(Dataset):
    def __init__(self, video_path):
        """
        Args: video_path (str): Path to the video file
        """
        # Initialize the VideoReader
        self.vr = decord.VideoReader(video_path, ctx=decord.cpu(0))  # Load video in CPU memory
        self.length = len(self.vr)  # Total number of frames
        self.video_size = self.vr[0].asnumpy().shape

    def __getitem__(self, idx):
        frame = self.vr[idx].asnumpy()
        return frame

    def __len__(self):
        return self.length

# Process video and save annotated video
def process_video(video_path, model_age, model_face_detection, output_video_path, output_annotations_path, classification=False, image_size=448):
    dataset = VideoDataset(video_path)
    dataloader = DataLoader(dataset, batch_size=1, shuffle=False)

    video_writer = None
    annotations = []

    for idx, frame in enumerate(tqdm(dataloader)):
        if idx > 50:
            break
        frame = frame.squeeze(0).numpy()
        annotated_frame, frame_annotations = plot_image_with_estimated_ages(image = frame, model_age = model_age,model_face_detection=  model_face_detection, index_frame = idx, classification = classification, image_size = image_size)

        # Initialize video writer
        if video_writer is None:
            height, width, _ = annotated_frame.shape
            fourcc = cv2.VideoWriter_fourcc(*'mp4v')
            video_writer = cv2.VideoWriter(output_video_path, fourcc, 30, (width, height))

        video_writer.write(cv2.cvtColor(annotated_frame, cv2.COLOR_RGB2BGR))
        for annotation in frame_annotations:
            annotation[0] = idx  # Set frame index
            annotations.append(annotation)

    video_writer.release()

    # Save annotations in MOT style
    with open(output_annotations_path, 'w') as f:
        for annotation in annotations:
            f.write(','.join(map(str, annotation)) + '\n')

    print(f"Annotated video saved to {output_video_path}")
    print(f"Annotations saved to {output_annotations_path}")

In [None]:
video_path = "/usr/users/vhassle/datasets/Wortschatzinsel/Neon_complete/Neon/2024-05-04-11-14-21/2024_05_04_11_14_21.mp4"
output_video_path = "/usr/users/vhassle/2024_05_04_11_14_21_faces.mp4"
output_annotations_path = "/usr/users/vhassle/2024_05_04_11_14_21_annotation.txt"


model_weights_path = '/usr/users/vhassle/psych_track/AgeSelf/models/age_classification_model_15_focal_pad.pth'
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')  
model_age = models.resnet50(pretrained=False)
num_ftrs = model_age.fc.in_features
model_age.fc = nn.Linear(num_ftrs, 3)
model_age.load_state_dict(torch.load(model_weights_path))
model_age = model_age.to(device)
model_age.eval()

model_face_detection = load_Retinanet("/usr/users/vhassle/psych_track/Pytorch_Retinaface/Resnet50_Final.pth")
model_face_detection = model_face_detection.to(device)


process_video(video_path = video_path, model_age = model_age, model_face_detection= model_face_detection, output_video_path = output_video_path, output_annotations_path = output_annotations_path, classification=True, image_size=150)

# V2 not working yet

In [None]:
%load_ext autoreload
%autoreload 2
import os 
os.environ["CUDA_VISIBLE_DEVICES"] = "7"
import cv2
import torch
import torchvision.transforms as transforms
import numpy as np
from PIL import Image
from tqdm import tqdm
from torch import nn
from torch.utils.data import Dataset, DataLoader
import decord
from torchvision import models, transforms
from pytorch_retinaface.detect import process_image, load_Retinanet #self created module self installed

In [None]:
video_path = ["/usr/users/vhassle/datasets/Wortschatzinsel/raspi_wsi1_2024_05_19_000.mp4"]
dataloader =  decord.VideoLoader(video_path, ctx=[decord.cpu(0)], shape=(10, 1200, 1600, 3), interval=0, skip=0, shuffle=0)

In [None]:

b = a[0].permute(0,3,1,2).float().to(device)
c = b[0:1,:,:,:]
c.shape

In [None]:
for i in range(1):
    model_face_detection(torch.rand(20,3,1200,1600).to("cuda"))

In [None]:
for i in range(100):
    model_face_detection(c+i)

# Kaum Beschleunigung durch parallele Verarbeitung hier

In [None]:
# Hier gehts richtig los
class ResizeToMaxDim:
    def __init__(self, max_size):
        self.max_size = max_size
    
    def __call__(self, img):
        w, h = img.size
        if w > h:
            new_w = self.max_size
            new_h = int(h * (self.max_size / w))
        else:
            new_h = self.max_size
            new_w = int(w * (self.max_size / h))
        return img.resize((new_w, new_h), Image.LANCZOS)

class PadToSquare:
    def __init__(self, size, fill=0):
        self.size = size
        self.fill = fill
    
    def __call__(self, img):
        w, h = img.size
        pad_w = (self.size - w) // 2
        pad_h = (self.size - h) // 2
        padding = (pad_w, pad_h, self.size - w - pad_w, self.size - h - pad_h)
        return F.pad(img, padding, fill=self.fill)

def process_image_batch(image, model_age, model_face_detection, index_frame=0, classification=False, image_size=150):
    transform = transforms.Compose([
        ResizeToMaxDim(image_size),
        PadToSquare(image_size),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])

    #model_age needs RGB
    # model_face_detection needs BGR

    faces = process_image(model_face_detection, frame_rgb)
    height, width, _ = frame.shape

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    annotations = []

    for face_key in faces.keys():
        face_area = faces[face_key]
        x1, y1, x2, y2 = face_area
        x1, y1, x2, y2 = max(0, x1), max(0, y1), min(width, x2), min(height, y2)

        face = frame_rgb[y1:y2, x1:x2]
        face_pil = Image.fromarray(face)
        input_image_before_cuda = transform(face_pil)
        input_image = input_image_before_cuda.unsqueeze(0).to(device)

        with torch.no_grad():
            output = model_age(input_image)
            if not classification:
                predicted_age = output.item()
                predicted_age_text = f'Age: {predicted_age:.2f}'
            else:
                predicted_age_group = nn.Softmax(dim=1)(output)
                age_group = predicted_age_group.argmax(dim=1).item()
                predicted_age_text = f'{age_group}'

        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
        cv2.putText(frame, predicted_age_text, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)

        annotation = [index_frame, face_key, x1, y1, x2 - x1, y2 - y1, 1, -1, -1, -1]
        annotations.append(annotation)

    return frame, annotations


In [None]:

# Set Decord to use PyTorch tensors
class VideoDataset(Dataset):
    def __init__(self, video_paths):
        self.vl = VideoLoader(video_paths, ctx=[cpu(0)], shape=(2, 320, 240, 3), interval=1, skip=5, shuffle=1)
        self.length = len(self.vl)

    def __getitem__(self, idx):
        batch = self.vl[idx]
        return batch[0], idx

    def __len__(self):
        return self.length

def process_video(video_path, model_age, model_face_detection, output_video_path, output_annotations_path, classification=False, image_size_face=150):
    #dataset = VideoDataset(video_path)
    #dataloader = DataLoader(dataset, batch_size=1, shuffle=False, num_workers=num_workers)
    decord.bridge.set_bridge('torch')
    dataloader =  decord.VideoLoader([video_path], ctx=[decord.cpu(0)], shape=(10, 1200, 1600, 3), interval=0, skip=0, shuffle=0)    
    
    video_writer = None
    annotations = []

    for frames_batch in tqdm(dataloader):
        frames, idx = frames_batch
        idx = [int(id[1]) for id in idx]

        frames = frames.permute(0, 3, 1, 2).numpy()  # Convert from BTCHW to BCHW
        frames = frames.device('cuda')

        annotated_frame, frame_annotations = image_with_estimated_ages(frame, model_age, model_face_detection, idx, classification, image_size_face)

    #     if video_writer is None:
    #         height, width, _ = annotated_frame.shape
    #         fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    #         video_writer = cv2.VideoWriter(output_video_path, fourcc, 30, (width, height))

    #     video_writer.write(cv2.cvtColor(annotated_frame, cv2.COLOR_RGB2BGR))
    #     for annotation in frame_annotations:
    #         annotation[0] = idx
    #         annotations.append(annotation)

    # video_writer.release()

    # with open(output_annotations_path, 'w') as f:
    #     for annotation in annotations:
    #         f.write(','.join(map(str, annotation)) + '\n')

    # print(f"Annotated video saved to {output_video_path}")
    # print(f"Annotations saved to {output_annotations_path}")


In [None]:
video_path = "/usr/users/vhassle/datasets/Wortschatzinsel/Neon_complete/Neon/2024-05-04-11-14-21/2024_05_04_11_14_21.mp4"
output_video_path = "/usr/users/vhassle/2024_05_04_11_14_21_faces.mp4"
output_annotations_path = "/usr/users/vhassle/2024_05_04_11_14_21_annotation.txt"


model_weights_path = '/usr/users/vhassle/psych_track/AgeSelf/models/age_classification_model_15_focal_pad.pth'
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')  
model_age = models.resnet50(pretrained=False)
num_ftrs = model_age.fc.in_features
model_age.fc = nn.Linear(num_ftrs, 3)
model_age.load_state_dict(torch.load(model_weights_path))
model_age = model_age.to(device)
model_age.eval()

model_face_detection = load_Retinanet("/usr/users/vhassle/psych_track/Pytorch_Retinaface/Resnet50_Final.pth")

process_video(video_path, model_age, model_face_detection, output_video_path, output_annotations_path, classification=True, image_size_face=150)