In [None]:
import numpy as np
import pandas as pd
import os
from PIL import Image
import random
import seaborn as sns
import cv2
from tqdm import tqdm_notebook as tqdm
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
import torch
import torchvision
from sklearn.metrics import classification_report
from scipy.special import softmax
import time
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import transforms, utils, datasets
from torch.utils.data import Dataset, DataLoader, SubsetRandomSampler
from sklearn.metrics import classification_report, confusion_matrix
from PIL import ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True
from albumentations import Resize, Compose
from albumentations.pytorch.transforms import ToTensor
from albumentations.augmentations.transforms import Normalize

In [None]:
def preprocessing_image(img_path):
    # transforamtions for input data
    transforms = Compose([
        Resize(244,244,interpolation=cv2.INTER_NEAREST),
        Normalize([0.485, 0.456, 0.406],
                  [0.229, 0.224, 0.225]),
        ToTensor(),
    ])
    input_img = cv2.imread(img_path)
    input_img = cv2.cvtColor(input_img, cv2.COLOR_BGR2RGB)
    input_data = transforms(image=input_img)['image']
    # add batch dimension
    batch_data = torch.unsqueeze(input_data,0)
    return batch_data

def preprocessing_image_sample(img_path):
    # transforamtions for input data
    transforms = Compose([
        Resize(244,244,interpolation=cv2.INTER_NEAREST),
        ToTensor(),
    ])
    input_img = cv2.imread(img_path)
    input_img = cv2.cvtColor(input_img, cv2.COLOR_BGR2RGB)
    input_data = transforms(image=input_img)['image']
    # add batch dimension
    batch_data = torch.unsqueeze(input_data,0)
    return batch_data
def load_data (root_path):
#     root_path = './data/nude_sexy_safe_v1_x320/testing/'
    classes = ['porn','neutral','sexy']
    subfolders = [d for d in os.listdir(root_path) if d in classes]
    classes = {c:i for i,c in enumerate(subfolders)}
    print('there are three classes: {}'.format(classes))
    test_data = {}

    for folder in subfolders:
        items = os.listdir(os.path.join(root_path,folder))
        for names in items:
            if names.endswith(".jpg") or names.endswith(".jpeg") or names.endswith(".png"):
                label = classes[folder]
                test_data[os.path.join(root_path,folder,names)] = label
    print('there are {} samples in the testing set'.format(len(test_data)))
    return test_data,classes

def postprocessing(output):
    confidence = torch.nn.functional.softmax(output,dim=1)[0]
#     confidence, indices = torch.sort(confidence, descending=True)
    return confidence

def postprocessing_0(output):
    confidence = torch.nn.functional.softmax(output,dim=1)[0]*100
    _, indices = torch.sort(output, descending=True)
    return indices[0]

def compute_accuracy(gts,dts):
    accurate = 0
    total = 0
    for key in gts.keys():
        if gts[key] == dts[key]:
            accurate += 1
        total += 1
    return accurate, total

In [None]:
class Args:
    data_dir = './data/clean_nsfw/'
    model = 'resnet50'
    device = [0]
    batch_size = 8
    print_freq = 10
    checkpoint = './ckpt/model_26_0.pth'
    workers = 1


args=Args()
classes = torch.load(args.checkpoint)['classes']
print(classes)
model = torchvision.models.__dict__[args.model](pretrained=False)
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, len(classes))
model.cuda()
model.load_state_dict(torch.load(args.checkpoint)['model'])
model.eval()

criterion = nn.CrossEntropyLoss()


In [None]:
# test_data is save in a dictionary [path]:label
test_data,classes = load_data('{}/test/'.format(args.data_dir))

In [None]:
# torch version evaluation
count = 0
start_time = time.time()
save_sample = {}
with torch.no_grad():
    model.eval()
    model.cuda()
    test_res = {}
    for key in test_data.keys():
        count += 1 
        image = preprocessing_image(key).cuda()
#         sample_image = preprocessing_image_sample(key).cuda()
#         sample_image = sample_image.cpu().numpy() *255
#         save_sample['inputs'] = sample_image.astype('uint8')
        output = model(image)
        pred = postprocessing(output)
#         save_sample['outputs'] = pred.cpu().numpy()
        test_res[key] = pred.cpu().numpy()
#         break

end_time = time.time()
print('RunTime: {}, FPS: {}'.format(end_time-start_time,len(test_data)/(end_time-start_time)))
# compute_accuracy(test_data,test_res)

In [None]:
def per_class_analysis (y_pred, y_true, classes):
    TP = [0 for i in range(len(classes))]
    FN = [0 for i in range(len(classes))]
    FP = [0 for i in range(len(classes))]
    precision = [0 for i in range(len(classes))]
    recall = [0 for i in range(len(classes))]
    y_pred_clean = []
    y_true_clean = []
    for key in y_pred.keys():
        confidence, label_pred = class_score_dict(y_pred[key],classes)
        label_true = y_true[key]
        y_pred_clean.append(label_pred)
        y_true_clean.append(label_true)
        if label_true == label_pred:
            TP[label_true] += 1
        else:
            FP[label_pred] += 1
            FN[label_true] += 1
    
    for i in range(len(classes)):
        precision[i] = TP[i]/(TP[i]+FP[i]+ 1e-16)
        recall[i] = TP[i] / (TP[i] + FN[i]+ 1e-16)
    
    return precision,recall,y_pred_clean,y_true_clean
        
        

def class_score_dict (y,classes):
    y_class = {classes[key]:y[classes[key]] for key in classes.keys()}
    
    y_largest = np.argmax(y, axis=0)
    return y_class,y_largest  

def per_class_analysis_threshold (y_pred, y_true, classes,threshold):
    TP = [0 for i in range(len(classes)+1)]
    FN = [0 for i in range(len(classes)+1)]
    FP = [0 for i in range(len(classes)+1)]
    precision = [0 for i in range(len(classes)+1)]
    recall = [0 for i in range(len(classes)+1)]
    y_pred_clean = []
    y_true_clean = []
    for key in y_pred.keys():
        confidence, label_pred = class_score_dict(y_pred[key],classes)
#         print(confidence,label_pred)
        label_true = y_true[key]


        if confidence[label_pred] < threshold:
            label_pred = 3
            
        y_pred_clean.append(label_pred)
        y_true_clean.append(label_true)
        if label_true == label_pred:
            TP[label_true] += 1
        else:
            FP[label_pred] += 1
            FN[label_true] += 1
            
    
    for i in range(len(classes)):
        precision[i] = TP[i]/(TP[i]+FP[i]+ 1e-16)
        recall[i] = TP[i] / (TP[i] + FN[i]+ 1e-16)
    
    return precision,recall,y_pred_clean,y_true_clean

'''
error_matrix[m][n] indicate the number of samples belonging to class=m being recognized as class=n

'''
def error_analysis (y_true,y_pred,target_names):
    size = len(target_names)
    error_matrix = np.zeros([size,size],dtype = int)
    for i in range(len(y_pred)):
        a = y_true[i]
        b = y_pred[i]

        if a == b:
            error_matrix[a][b] += 1
        else:
            error_matrix[a][b] += 1
            
                
    return error_matrix

def threshold_evaluation(y_pred,y_true,classes,thre_start,thre_end):
    x = np.arange(thre_start,thre_end,0.01)
    y_0 = []
    y_1 = []
    y_2 = []
    for thre in x:
        precision,recall,y_pred_clean, y_true_clean = per_class_analysis_threshold(y_pred,y_true,classes,thre)
        y_0.append([precision[0],recall[0]])
        y_1.append([precision[1],recall[1]])
        y_2.append([precision[2],recall[2]])
#         target_names = ['neutral', 'porn','sexy','unqualified']
#         confusion_matrix = error_analysis(y_true_clean, y_pred_clean, target_names)
#         print(precision,recall)
#         print(confusion_matrix)
    return x,y_0,y_1,y_2

In [None]:
def plot_line_thre (x,y):
    y_pred = [i[0] for i in y]
    y_recall = [i[1] for i in y]
    y_f1 = [y_pred[i]*y_recall[i]*2/(y_pred[i]+y_recall[i])for i in range(len(y_pred))]     
    plt.plot(x,y_pred)
    plt.plot(x,y_recall)
    plt.plot(x,y_f1)
    plt.show()
    

In [None]:
x_thre, y_thre_0, y_thre_1, y_thre_2 = threshold_evaluation(test_res,test_data,classes,0.5,1)

In [None]:
plot_line_thre(x_thre,y_thre_0)

In [None]:
# threshold
precision, recall, y_pred_clean, y_true_clean = per_class_analysis_threshold(test_res,test_data,classes,0.99)
target_names = ['neutral', 'porn','sexy','unqualified']
print(classification_report(y_true_clean, y_pred_clean, target_names=target_names))
print(error_analysis(y_true_clean, y_pred_clean, target_names))

In [None]:
precision, recall, y_pred_clean, y_true_clean = per_class_analysis(test_res,test_data,classes)

target_names = ['neutral', 'porn','sexy']
print(classification_report(y_true_clean, y_pred_clean, target_names=target_names))
print(error_analysis(y_true_clean, y_pred_clean, target_names))


In [None]:
def show_multi_images (img_list):
#     if len(img_list) > 50:
#         img_list = random.sample(img_list, 50)
#         img_list.sort()
    

    result = Image.new("RGB", (2000, 4000))

    for index, file in enumerate(img_list):
    #     print(index)x
    #     path = os.path.expanduser(file)
        img = Image.open(file)
    #     img.thumbnail((400, 400), Image.ANTIALIAS)
        img = img.resize((400, 400))
        x = index % 5 * 400
        y = index // 5 * 400
        w, h = img.size
        result.paste(img, (x, y, x + w, y + h))  

    # DPI, here, has _nothing_ to do with your screen's DPI.
    dpi = 80.0
    xpixels, ypixels = 2000, 4000

    fig = plt.figure(figsize=(ypixels/dpi, xpixels/dpi), dpi=dpi)
    plt.imshow(result)
def obtain_error_list(y_pred, y_true,indexA,indexB):
    error_list = []
    for key in y_pred.keys():
        confidence, label_pred = class_score_dict(y_pred[key],classes)
        label_true = y_true[key]
        if label_pred == indexB and label_true == indexA:
            error_list.append(key)
    return error_list  

def filtered_accuracy (y_pred, y_true,indexA,conf_thre):
    total = 0
    acc = 0
    for key in y_pred.keys():
        confidence, label_pred = class_score_dict(y_pred[key],classes)
        label_true = y_true[key]
        if label_pred != indexA:
            continue
        if confidence[label_pred] > conf_thre:
            total += 1
            if label_pred == label_true:
                acc += 1
    print(total,acc)
    return acc/(total+1e-16)

def obtain_required_list(y_pred, y_true,indexA):
    img_list = []
    img_conf_list = []
    for key in y_pred.keys():
        confidence, label_pred = class_score_dict(y_pred[key],classes)
        label_true = y_true[key]
        if label_true == indexA:
            img_list.append(key)
            img_conf_list.append(confidence[indexA])
            
    return img_list, img_conf_list

    

In [None]:
error_list = obtain_error_list(test_res,test_data,2,1)
show_multi_images (error_list)

In [None]:
filtered_accuracy(test_res,test_data,2,0.5)
# 1401 1381
# 0.9857244825124911

In [None]:
img_list, img_conf_list = obtain_required_list(test_res,test_data,2)
conf_thre = 0.98
error_list = [img_list[i] for i,conf in enumerate(img_conf_list) if conf > conf_thre]
show_multi_images (error_list)

## Infant  detection

In [None]:
#infant 
class Args:
    data_dir = './dataset/'
    model = 'resnet50'
    device = [0]
    batch_size = 8
    print_freq = 10
    checkpoint = './ckpt/model_22_0.pth'
    workers = 1


args=Args()
# classes = torch.load(args.checkpoint)['classes']
# print(classes)
model = torchvision.models.__dict__[args.model](pretrained=False)
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, len(classes))
model.cuda()
model.load_state_dict(torch.load(args.checkpoint)['model'])
model.eval()

criterion = nn.CrossEntropyLoss()


In [None]:
root_path = './dataset'
folder = 'infant'
test_data={}
items = os.listdir(os.path.join(root_path,folder))
for names in items:
    if names.endswith(".jpg") or names.endswith(".jpeg") or names.endswith(".png"):
                label = 0 #neutral
                test_data[os.path.join(root_path,folder,names)] = label


In [None]:
count = 0
start_time = time.time()
with torch.no_grad():
    model.eval()
    model.cuda()
    test_res = {}
    for key in test_data.keys():
        count += 1 
        image = preprocessing_image(key).cuda()
        output = model(image)
        pred = postprocessing(output)
        test_res[key] = pred.cpu().numpy()

end_time = time.time()
print('RunTime: {}, FPS: {}'.format(end_time-start_time,len(test_data)/(end_time-start_time)))



In [None]:
precision, recall, y_pred_clean, y_true_clean = per_class_analysis(test_res,test_data,classes)

In [None]:
precision, recall, y_pred_clean, y_true_clean = per_class_analysis(test_res,test_data,classes)
print(error_analysis(y_true_clean, y_pred_clean, target_names))

In [None]:
error_list = obtain_error_list(test_res,test_data,0,0)
show_multi_images (error_list)

In [None]:
import torch