In [16]:
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
import seaborn as sns
from pytorchcv.model_provider import get_model as ptcv_get_model
import torch.nn as nn
import torch
from tqdm.notebook import tqdm

from collections import OrderedDict
from PIL import Image
import torchvision
from torchvision.transforms import transforms

import warnings
warnings.filterwarnings('ignore')
%matplotlib inline

import cv2
# import tensorflow as tf
# from keras.preprocessing.image import load_img
# from keras.models import Sequential, Model
# from keras.layers import Dense, Conv2D, Dropout, Flatten, MaxPooling2D, Input

# from utils.util import load_model
# from network.network_utils import build_model
# from network.optimizer_utils import get_optimizer

In [17]:
import Network as models
from Utils import *

In [18]:
gender_dict = {0:'Male', 1:'Female'}
race_dict = {0:'White', 1:'Black', 2:'Asian', 3:'Indian', 4:'Other'}

In [19]:
# def main(cfg):
#     os.environ["CUDA_VISIBLE_DEVICES"] = cfg.gpu

#     net = build_model(cfg)

#     if torch.cuda.is_available():
#         torch.backends.cudnn.benchmark = True
#         net = net.cuda()

#     optimizer = get_optimizer(cfg, net)
#     lr_scheduler = get_scheduler(cfg, optimizer)

#     if cfg.dataset_name == 'morph':
#         test_ref_dataset = morph.MorphRef(cfg=cfg, tau=cfg.tau, dataset_dir=cfg.dataset_root)
#         test_dataset = morph.MorphTest(cfg=cfg, dataset_dir=cfg.dataset_root)

#         test_ref_loader = DataLoader(test_ref_dataset, batch_size=cfg.batch_size, num_workers=cfg.num_workers, shuffle=False, pin_memory=True)
#         test_loader = DataLoader(test_dataset, batch_size=cfg.batch_size, num_workers=cfg.num_workers, shuffle=False, pin_memory=True)

#     else:
#         raise ValueError(f'Undefined database ({cfg.dataset_name}) has been given')

#     if cfg.load:
#         load_model(cfg, net, optimizer=optimizer, load_optim_params=False)

#     if lr_scheduler:
#        lr_scheduler.step()

#     net.eval()
#     test(cfg, net, test_ref_loader, test_loader)

In [74]:
class Regressor(nn.Sequential):
    def __init__(self, input_channel, output_channel):
        super(Regressor, self).__init__()
        self.convA = nn.Conv2d(input_channel, output_channel, kernel_size=1, stride=1)
        self.leakyreluA = nn.ReLU()
        self.convB = nn.Conv2d(output_channel, output_channel, kernel_size=1, stride=1)
        self.leakyreluB = nn.ReLU()
        self.dropout = nn.Dropout(p=0.5)
        self.convC = nn.Conv2d(output_channel, 1, kernel_size=1, stride=1)
        self.activation = nn.Tanh()


    def forward(self, x):
        x = self.convA(x)
        x = self.leakyreluA(x)
        x = self.convB(x)
        x = self.leakyreluB(x)
        x = self.dropout(x)
        x = self.convC(x)

        return self.activation(x)

class Global_Regressor(nn.Module):
    def __init__(self):
        super(Global_Regressor, self).__init__()
        self.encoder = ptcv_get_model("bn_vgg16", pretrained=True)
        self.avg_pool = nn.AvgPool2d(kernel_size=7)
        self.regressor = Regressor(1536, 512)

    def forward_siamese(self, x):
        x = self.encoder.features.stage1(x)
        x = self.encoder.features.stage2(x)
        x = self.encoder.features.stage3(x)
        x = self.encoder.features.stage4(x)
        x = self.encoder.features.stage5(x)
        x = self.avg_pool(x)

        return x

    def forward(self, phase, **kwargs):

        # if phase == 'train':
        #     x_1_1, x_1_2, x_2 = kwargs['x_1_1'], kwargs['x_1_2'], kwargs['x_2']
        #     x_1_1 = self.forward_siamese(x_1_1)
        #     x_1_2 = self.forward_siamese(x_1_2)
        #     x_2 = self.forward_siamese(x_2)

        #     x = torch.cat([x_1_1, x_1_2, x_2], dim=1)

        #     output = self.regressor(x)

        #     return output

        if phase == 'test':
            x_1_1, x_1_2, x_2 = kwargs['x_1_1'], kwargs['x_1_2'], kwargs['x_2']
            x = torch.cat([x_1_1, x_1_2, x_2], dim=1)

            output = self.regressor(x)

            return output

        # elif phase == 'extraction':
        #     x = kwargs['x']
        #     x = self.forward_siamese(x)

        #     return x

In [75]:
# device = torch.device("cuda:%s" % (arg.gpu) if torch.cuda.is_available() else "cpu")
# print(device)

model = Global_Regressor()

model_path = 'Back_vgg16bn_M_MWR_T0.20_2024-05-09 00:38:15/mae_Epoch_4_MAE_5.9005_CS_0.5543.pth'
initial_model = os.path.join(os.getcwd(), '../hdd1/2023/2022CVPR_code_publish/results/results_mwr/utk/MWR', model_path)

### Load network parameters ###
checkpoint = torch.load(initial_model)
model_dict = model.state_dict()

model_dict.update(checkpoint['model_state_dict'])
print(model_dict.keys())
model.load_state_dict(model_dict)
print("=> loaded checkpoint '{}'".format(initial_model))

model.eval()

odict_keys(['encoder.features.stage1.unit1.conv.weight', 'encoder.features.stage1.unit1.bn.weight', 'encoder.features.stage1.unit1.bn.bias', 'encoder.features.stage1.unit1.bn.running_mean', 'encoder.features.stage1.unit1.bn.running_var', 'encoder.features.stage1.unit1.bn.num_batches_tracked', 'encoder.features.stage1.unit2.conv.weight', 'encoder.features.stage1.unit2.bn.weight', 'encoder.features.stage1.unit2.bn.bias', 'encoder.features.stage1.unit2.bn.running_mean', 'encoder.features.stage1.unit2.bn.running_var', 'encoder.features.stage1.unit2.bn.num_batches_tracked', 'encoder.features.stage2.unit1.conv.weight', 'encoder.features.stage2.unit1.bn.weight', 'encoder.features.stage2.unit1.bn.bias', 'encoder.features.stage2.unit1.bn.running_mean', 'encoder.features.stage2.unit1.bn.running_var', 'encoder.features.stage2.unit1.bn.num_batches_tracked', 'encoder.features.stage2.unit2.conv.weight', 'encoder.features.stage2.unit2.bn.weight', 'encoder.features.stage2.unit2.bn.bias', 'encoder.feat

Global_Regressor(
  (encoder): VGG(
    (features): Sequential(
      (stage1): Sequential(
        (unit1): ConvBlock(
          (conv): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (bn): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (activ): ReLU(inplace=True)
        )
        (unit2): ConvBlock(
          (conv): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (bn): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (activ): ReLU(inplace=True)
        )
        (pool1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
      )
      (stage2): Sequential(
        (unit1): ConvBlock(
          (conv): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (bn): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (activ): ReLU

In [88]:
def get_reference_ages():
    return 1, 116

def load_and_preprocess_image(model, image_path):
    # Define the image transformations
    transform = transforms.Compose([
        transforms.Resize((224, 224)),  # Resize to the input size expected by VGG16
        transforms.ToTensor(),          # Convert the image to a PyTorch tensor
        transforms.Normalize(mean=[0.485, 0.456, 0.406],  # ImageNet mean normalization
                             std=[0.229, 0.224, 0.225])   # ImageNet std normalization
    ])

    # Load the image
    image = Image.open(image_path).convert('RGB')
    image = transform(image)
    image = image.unsqueeze(0)  # Add a batch dimension
    image = model.forward_siamese(image)
    return image

def predict_age(model, image_tensor):
    # Duplicate the preprocessed image tensor to match expected input format
    input_tensors = {'x_1_1': image_tensor, 'x_1_2': image_tensor, 'x_2': image_tensor}

    # Predict using the model
    with torch.no_grad():
        output = model.forward('test', **input_tensors)
        # Assuming the model outputs a single value per input batch
        output = output.squeeze().item()
    
        up_age, lb_age = get_reference_ages()

        # Convert these ages into the logarithmic scale for calculations
        log_up_age = np.log(up_age)
        log_lb_age = np.log(lb_age)

        # Calculate the mean and tau (difference) in the logarithmic scale
        mean_log_age = (log_up_age + log_lb_age) / 2
        tau = abs(mean_log_age - log_lb_age)

        # Calculate the refined age using the model's output
        refined_log_age = output * tau + mean_log_age
        predicted_age = np.exp(refined_log_age)

    return predicted_age

image_path = '/Users/josh/Downloads/UTKFace/90_1_0_20170110183452817.jpg'
# Assuming you have a function to load and preprocess the image
image_tensor = load_and_preprocess_image(model, image_path)  # Ensure this matches model input expectations

# # Simulate the same input for all three branches (if your model logic allows for this)
# input_dict = {'x_1_1': image_tensor, 'x_1_2': image_tensor, 'x_2': image_tensor}

# # Predict the age
# model.eval()
with torch.no_grad():
    predicted_age = predict_age(model, image_tensor)
    # output = model.forward('test', **input_dict)
    # predicted_age = output.squeeze().item()  # Assuming the model output is directly interpretable as age

print(f'Predicted Age: {predicted_age}')


Predicted Age: 4.424435780191368
