In [1]:
import os
os.environ["CUDA_VISIBLE_DEVICES"]="6"
print(os.environ["CUDA_VISIBLE_DEVICES"])

6


In [2]:
from PIL import Image
import pandas as pd
import numpy as np
import torch
from torch.utils.data import DataLoader
from torch.utils.data.dataset import Dataset

import lavis
from lavis.models import load_model_and_preprocess

import random 
random.seed(43)

EXT = ['.jpg', '.jpeg', '.png']

class TextInvDataset(Dataset):
    def __init__(self, roots, labels, vis_processors=None, txt_processors=None):
        
        self.path_and_labels = {
            'img_path': [],
            'label': []
        }
        
        assert len(roots) == len(labels), "Please assign a label for each image root."
        
        for root, label in zip(roots, labels):
            n_sample = 0
            for r, dirs, files in os.walk(root):
                for file in files:
                    if os.path.splitext(file)[-1] in EXT:
                        self.path_and_labels["img_path"].append(os.path.join(r, file))
                        self.path_and_labels["label"].append(label)
                        n_sample += 1
            print(f'Found {n_sample} images with label "{label}".')
        
        self.path_and_labels = pd.DataFrame.from_dict(self.path_and_labels)
        self.path_and_labels.set_index("img_path", inplace=True)
        
        self.vis_processors = vis_processors
        self.txt_processors = txt_processors

    def __len__(self):
        
        return len(list(self.path_and_labels.index))

    def __getitem__(self, index):

        image_path = list(self.path_and_labels.index)[index]
        image = Image.open(image_path).convert("RGB")
        if self.vis_processors:
            image = self.vis_processors(image)
        
        label = self.path_and_labels.loc[image_path, "label"]

        return image, label

# This is for query lots of images

from os import listdir
from os.path import isfile, join
from tqdm import tqdm
from sklearn.metrics import accuracy_score, confusion_matrix
import time

class InstructBLIP():
    def __init__(self, name="blip2_vicuna_instruct_textinv", model_type="vicuna7b", is_eval=True, device="cpu") -> None:
        print(f'Loading model...')
        #self.model, self.vis_processors, self.txt_processors = load_model_and_preprocess(name, model_type, is_eval, device)
        self.imgs = []
        self.labels = []
        
        # QA
        self.question = ""
        
        # results
        self.acc = None
        self.confusion_mat = None
        
        self.acc_3class = None
        self.confusion_mat_3class = None
        
        self.com_acc = None
        self.com_confusion_mat = None
        self.uncom_acc = None
        self.uncom_confusion_mat = None

    def LoadModels(self, model, vis_processors, txt_processors, device):
        self.model = model
        self.vis_processors = vis_processors
        self.txt_processors = txt_processors
        self.device = device
        
    def LoadData(self, roots, labels):
        self.roots = [roots] if isinstance(roots, str) else roots
        self.text_labels = [labels] if isinstance(labels, str) else labels
        self.dataset = TextInvDataset(self.roots, self.text_labels, vis_processors=self.vis_processors["eval"])
        self.dataloader = DataLoader(dataset=self.dataset, batch_size=8, shuffle=False, num_workers=8)    
    
    def QueryImgs_batch(self, question, true_string="yes", logPath='log.txt'):
        self.labels = []
        self.label_3class = []
        self.ans_list = []
        self.question = question
        
        for image, label in tqdm(self.dataloader):
            
            image = image.to(self.device)
            
            questions = [self.question] * image.shape[0]
            
            # samples = {"image": image, "text_input": questions}
            # ans = self.model.predict_answers(samples=samples, inference_method="generate", answer_list=["yes", "no"])
            # pred_label = [0 if a == true_string else 1 for a in ans]
            
            samples = {"image": image, "prompt": questions}
            candidates = ["yes", "no"]
            ans = self.model.predict_class(samples=samples, candidates=candidates)
            pred_label = [0 if candidates[list(a).index(0)]==true_string else 1 for a in ans]
            self.ans_list += pred_label
            
            label = [0 if l == true_string else 1 for l in label]
            self.labels += label
        
        self.acc = accuracy_score(self.labels, self.ans_list)
        self.confusion_mat = confusion_matrix(self.labels, self.ans_list, labels=[0,1])
        
        self.ans_list = np.array(self.ans_list)
        self.labels = np.array(self.labels)
        self.label_3class = np.array(self.label_3class)
        
        self.PrintResult(detailed=True, logPath=logPath)
        
        return self.acc, self.confusion_mat, self.ans_list, self.labels, self.label_3class
    
    def Query(self, image, question):
        image = self.vis_processors["eval"](image).unsqueeze(0).to(self.device)
        
        samples = {"image": image, "prompt": question}
        candidates = ["yes", "no"]
        ans = self.model.predict_class(samples=samples, candidates=candidates)
        pred_label = ["True" if candidates[list(a).index(0)]=="yes" else "Fake" for a in ans]
        return pred_label

    def PrintResult(self, detailed=False, acc=None, confusion_mat=None, ans_list=None, labels=None, logPath=None):
        
        if acc:
            self.acc = acc
        if confusion_mat:
            self.confusion_mat = confusion_mat
        if ans_list:
            self.ans_list = ans_list
        if labels:
            self.labels = labels
        
        if logPath:
            logfile = open(logPath, 'a')
        
        if detailed:
            
            print(f'[TIME]      : {time.ctime()}', file=logfile)
            print(f'[Finetuned] : {self.model.finetuned}', file=logfile)
            print(f'[Img roots] : {self.roots}', file=logfile)
            print(f'[Labels]    : {self.text_labels}', file=logfile)
            print(f'[Question]  : {self.question}\n', file=logfile)
            
            print(f'=== Overall ===', file=logfile)
            print(f'Acc: {self.acc*100:.2f}%', file=logfile)
            self.PrintConfusion(self.confusion_mat, logfile=logfile)
            print('\n', file=logfile)
            
            if 0 in self.labels:
                real_ans_list = self.ans_list[self.labels==0]
                real_label = [0] * len(real_ans_list)
                self.real_acc = accuracy_score(real_label, real_ans_list)
                self.real_confusion_mat = confusion_matrix(real_label, real_ans_list, labels=[0,1])
                print(f'=== Real images ===', file=logfile)
                print(f'Acc: {self.real_acc*100:.2f}%', file=logfile)
                self.PrintConfusion(self.real_confusion_mat, logfile=logfile)
                print('\n', file=logfile)
            else:
                print(f'=== No real images ===\n', file=logfile)
            
            
            if 1 in self.labels:
                fake_ans_list = self.ans_list[self.labels==1]
                fake_label = [1] * len(fake_ans_list)
                self.com_acc = accuracy_score(fake_label, fake_ans_list)
                self.com_confusion_mat = confusion_matrix(fake_label, fake_ans_list, labels=[0,1])
                print(f'=== Fake images ===', file=logfile)
                print(f'Acc: {self.com_acc*100:.2f}%', file=logfile)
                self.PrintConfusion(self.com_confusion_mat, logfile=logfile)
                print('\n', file=logfile)
            else:
                print(f'=== No fake images ===\n', file=logfile)
        else:
            print(f'Question: {self.question}\n', file=logfile)
            print(f'Acc: {self.acc*100:.2f}%', file=logfile)
            self.PrintConfusion(self.confusion_mat, logfile=logfile)
            print('\n', file=logfile)
        
        logfile.close()
    
    def PrintConfusion(self, mat, logfile):
        padding = ' '
        print(f'        | Pred real | Pred fake |', file=logfile)
        print(f'GT real | {mat[0, 0]:{padding}<{10}}| {mat[0, 1]:{padding}<{11}}|', file=logfile)
        print(f'GT fake | {mat[1, 0]:{padding}<{10}}| {mat[1, 1]:{padding}<{11}}|', file=logfile)
        
    def MultipleAns(self, ans1, ans2):
    
        # Q1: Is this photo common in real world?
        # Q2: Is this photo generated by a model?
        
        final_ans = []
        for ans in zip(ans1, ans2):
            if ans[0] == 0 and ans[1] == 0:
                final_ans.append(0)
            else:
                final_ans.append(1)
        
        acc = accuracy_score(self.labels, final_ans)
        confusion_mat = confusion_matrix(self.labels, final_ans)
        print(f'Accuracy: {acc*100:.2f}%')
        self.PrintConfusion(confusion_mat)
        
        self.ans_list = final_ans
        self.acc = acc
        self.confusion_mat = confusion_mat
        
        return acc, confusion_mat, final_ans
    
def print_combine_result(pretrained_ans, finetuned_ans, label, logPath):
    
    logfile = open(logPath, 'a')
    
    def _print_confusion(mat, logfile):
        padding = ' '
        print(f'        | Pred real | Pred fake |', file=logfile)
        print(f'GT real | {mat[0, 0]:{padding}<{10}}| {mat[0, 1]:{padding}<{11}}|', file=logfile)
        print(f'GT fake | {mat[1, 0]:{padding}<{10}}| {mat[1, 1]:{padding}<{11}}|', file=logfile)
    
    comb_ans = np.ceil((pretrained_ans + finetuned_ans)/2).astype(np.int64)
    
    comb_acc = accuracy_score(label, comb_ans)
    comb_confusion_mat = confusion_matrix(label, comb_ans, labels=[0,1])
    
    print(f'=== Overall (Comb) ===', file=logfile)
    print(f'Acc: {comb_acc*100:.2f}%', file=logfile)
    _print_confusion(comb_confusion_mat, logfile=logfile)
    print('\n', file=logfile)
    
    real_ans_list = comb_ans[label==0]
    real_label = [0] * len(real_ans_list)
    real_acc = accuracy_score(real_label, real_ans_list)
    real_confusion_mat = confusion_matrix(real_label, real_ans_list, labels=[0,1])
    print(f'=== Real images (Comb) ===', file=logfile)
    print(f'Acc: {real_acc*100:.2f}%', file=logfile)
    _print_confusion(real_confusion_mat, logfile=logfile)
    print('\n', file=logfile)
    
    
    com_ans_list = comb_ans[label==1]
    com_label = [1] * len(com_ans_list)
    com_acc = accuracy_score(com_label, com_ans_list)
    com_confusion_mat = confusion_matrix(com_label, com_ans_list, labels=[0,1])
    print(f'=== Common fake images (Comb) ===', file=logfile)
    print(f'Acc: {com_acc*100:.2f}%', file=logfile)
    _print_confusion(com_confusion_mat, logfile=logfile)
    print('\n', file=logfile)
    
    return comb_acc, comb_confusion_mat, comb_ans

  from .autonotebook import tqdm as notebook_tqdm
2023-10-20 11:05:14.024764: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: SSE4.1 SSE4.2 AVX AVX2 AVX512F AVX512_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# model, vis_processors, txt_processors = load_model_and_preprocess(name="blip2_vicuna_instruct_textinv", model_type="vicuna7b", is_eval=True, device=device)
model, vis_processors, txt_processors = load_model_and_preprocess(name="blip2_vicuna_instruct_textinv_cam", model_type="vicuna7b", is_eval=True, device=device)

print(f'Load model OK!')

tokenizer OK!
visual encoder OK!
Q-former OK!


Loading checkpoint shards: 100%|██████████| 2/2 [00:11<00:00,  5.67s/it]


LLM OK!
Load model OK!


In [4]:
torch.cuda.empty_cache()

path = "/eva_data0/denny/textual_inversion/debug/1_fake/common/00009.png"
image = Image.open(path)
question = "Is this photo real?"
answer = "no"

instruct = InstructBLIP()
instruct.LoadModels(model, vis_processors, txt_processors, device)

Loading model...


In [12]:
import cv2
import os
from pytorch_grad_cam import GradCAM, HiResCAM, ScoreCAM, GradCAMPlusPlus, AblationCAM, XGradCAM, EigenCAM, FullGrad
from pytorch_grad_cam.utils.model_targets import ClassifierOutputTarget
from pytorch_grad_cam.utils.image import show_cam_on_image, preprocess_image

def reshape_transform(tensor, height=16, width=16):
    result = tensor[:, 1:, :].reshape(tensor.size(0),
                                      height, width, tensor.size(2))

    # Bring the channels to the first dimension,
    # like in CNNs.
    result = result.transpose(2, 3).transpose(1, 2)
    return result

def resize_crop_image(rgb_img, image_size=224):
    
    height, width, channel = rgb_img.shape
    
    shorter_side = np.min([height, width])
    resize_ratio = 224 / shorter_side
    rgb_img = cv2.resize(rgb_img, None, fx=resize_ratio, fy=resize_ratio)
    
    height, width, channel = rgb_img.shape
    mid_x, mid_y = int(width/2), int(height/2)
    cw2, ch2 = int(image_size/2), int(image_size/2) 
    rgb_img = rgb_img[mid_y-ch2:mid_y+ch2, mid_x-cw2:mid_x+cw2]

    return rgb_img

cam_model = instruct.model
for name, param in cam_model.visual_encoder.named_parameters():
    param.requires_grad = True

# model_type = "SD2"
# file_idx = 16503
# path = f'/eva_data0/denny/textual_inversion/60k_6k_6k/test/1_fake/SD2/commonFake_COCO/{file_idx}.png'

# model_type = "SD2IP"
# file_idx = '000000233526'
# path = f'/eva_data0/denny/textual_inversion/60k_6k_6k/test/1_fake/SD2Inpaint_224/COCO_train2014_{file_idx}_crop000_inpainted_0.png'

model_type = "SDXLIP"
file_idx = '000000233526'
path = f'/eva_data0/iammingggg/textual_inversion/60k_6k_6k/test/1_fake/SDXLInpaint/SDXLInpainted_binaryMask/COCO_train2014_{file_idx}_inpainted.png'

out_dir = os.path.join('cam_results', f'{model_type}_{file_idx}')
os.makedirs(out_dir, exist_ok=True)

rgb_img = Image.open(path).convert("RGB")
rgb_img = np.array(rgb_img)[:, :, ::-1]

rgb_img = resize_crop_image(rgb_img)
rgb_img = np.float32(rgb_img) / 255
input_tensor = preprocess_image(
    rgb_img, 
    mean=[0.4730, 0.4499, 0.4129],
    std=[0.2780, 0.2713, 0.2872]
)

# image = Image.open(path)
# image1 = instruct.vis_processors["eval"](image).unsqueeze(0).to(instruct.device)
# with torch.no_grad():
#     with instruct.model.maybe_autocast():
#         print(cam_model(image1)[0].shape)

# yes_yes = instruct.model.all_logits[0][0, 1, 4874]
# yes_no = instruct.model.all_logits[0][0, 1, 694]
# no_yes = instruct.model.all_logits[0][1, 1, 4874]
# no_no = instruct.model.all_logits[0][1, 1, 694]

# print(yes_yes)
# print(yes_no)
# print(no_yes)
# print(no_no)

exp = "neg_loss"

n_blocks = len(cam_model.visual_encoder.blocks)

# Last block
block_idx = n_blocks-1
target_layers = [cam_model.visual_encoder.blocks[block_idx].norm1]
with GradCAM(model=cam_model, target_layers=target_layers, use_cuda=True, reshape_transform=reshape_transform) as cam:
    with instruct.model.maybe_autocast():
        grayscale_cam = cam(input_tensor=input_tensor)
    grayscale_cam = grayscale_cam[0, :]
    visualization = show_cam_on_image(rgb_img, grayscale_cam, use_rgb=True)
    cv2.imwrite(os.path.join(out_dir, f'cam_block[{block_idx}]_norm1_{exp}.png'), visualization)
    print(f'Save GradCAM for block [{block_idx}].')

# GradCAM for every 5 blocks
for block_idx in range(0, n_blocks, 5):
    target_layers = [cam_model.visual_encoder.blocks[block_idx].norm1]
    with GradCAM(model=cam_model, target_layers=target_layers, use_cuda=True, reshape_transform=reshape_transform) as cam:
        with instruct.model.maybe_autocast():
            grayscale_cam = cam(input_tensor=input_tensor)
        grayscale_cam = grayscale_cam[0, :]
        visualization = show_cam_on_image(rgb_img, grayscale_cam, use_rgb=True)
        cv2.imwrite(os.path.join(out_dir, f'cam_block[{block_idx}]_{exp}.png'), visualization)
        print(f'Save GradCAM for block [{block_idx}]_norm1.')

# GradCAM for averaging every blocks
target_layers = [cam_model.visual_encoder.blocks[block_idx].norm1 for block_idx in range(n_blocks)]
with GradCAM(model=cam_model, target_layers=target_layers, use_cuda=True, reshape_transform=reshape_transform) as cam:
    with instruct.model.maybe_autocast():
        grayscale_cam = cam(input_tensor=input_tensor)
    grayscale_cam = grayscale_cam[0, :]
    visualization = show_cam_on_image(rgb_img, grayscale_cam, use_rgb=True)
    cv2.imwrite(os.path.join(out_dir, f'cam_block_all_norm1_{exp}.png'), visualization)
    print(f'Save GradCAM for all blocks.')

tensor([[   2, 4874],
        [   2,  694]], device='cuda:0')
Save GradCAM for block [38].
tensor([[   2, 4874],
        [   2,  694]], device='cuda:0')
Save GradCAM for block [0]_norm1.
tensor([[   2, 4874],
        [   2,  694]], device='cuda:0')
Save GradCAM for block [5]_norm1.
tensor([[   2, 4874],
        [   2,  694]], device='cuda:0')
Save GradCAM for block [10]_norm1.
tensor([[   2, 4874],
        [   2,  694]], device='cuda:0')
Save GradCAM for block [15]_norm1.
tensor([[   2, 4874],
        [   2,  694]], device='cuda:0')
Save GradCAM for block [20]_norm1.
tensor([[   2, 4874],
        [   2,  694]], device='cuda:0')
Save GradCAM for block [25]_norm1.
tensor([[   2, 4874],
        [   2,  694]], device='cuda:0')
Save GradCAM for block [30]_norm1.
tensor([[   2, 4874],
        [   2,  694]], device='cuda:0')
Save GradCAM for block [35]_norm1.
tensor([[   2, 4874],
        [   2,  694]], device='cuda:0')
Save GradCAM for all blocks.


In [6]:
from omnixai.data.image import Image
from omnixai.explainers.vision.specific.gradcam.pytorch.gradcam import GradCAM

idx2label = ["yes", "no"]

input_tensor = instruct.vis_processors["eval"](image).unsqueeze(0).to(instruct.device)
cam_model = instruct.model
target_layers = cam_model.visual_encoder.blocks[-1].norm1
explainer = GradCAM(
    model=cam_model,
    target_layer=target_layers
)
# Explain the top label
explanations = explainer.explain(input_tensor)
explanations.ipython_plot(index=0, class_names=idx2label)

In [9]:
t = torch.tensor([[-1, 1]], dtype=torch.float)
torch.nn.functional.softmax(t, dim=-1)

tensor([[0.1192, 0.8808]])

In [5]:
logPath = '/home/denny/LAVIS/deepfake-detection/log/log.txt'
# logPath = '/home/denny/LAVIS/deepfake-detection/log/log2.txt'
# logPath = '/home/denny/LAVIS/deepfake-detection/log/SD2_SD2IP_90k_lora_onlyCommon.txt'
# logPath = '/home/denny/LAVIS/deepfake-detection/log/SD2_SD2IP_90k_lora_onlyCommon2.txt'

q1 = "Is this photo real?"
q2 = "Is this photo real [*]?"

file = open(logPath, 'a')
file.close()

instruct = InstructBLIP()
instruct.LoadModels(model, vis_processors, txt_processors, device)

print(f'Log path: {logPath}')
print(f'Q1: {q1}')
# print(f'Q2: {q2}')

# csvfiles = [
# # "/eva_data0/denny/textual_inversion/debug_label.csv",
# "/eva_data0/denny/textual_inversion/60k_6k_6k/test_COCO_label.csv",
# # "/eva_data0/denny/textual_inversion/60k_6k_6k/test_Flickr_label.csv",
# # "/eva_data0/denny/textual_inversion/60k_6k_6k/test_SD2_label.csv",
# # "/eva_data0/denny/textual_inversion/60k_6k_6k/test_SDXL_label.csv", 
# # "/eva_data0/denny/textual_inversion/60k_6k_6k/test_IF_label.csv",
# # "/eva_data0/denny/textual_inversion/60k_6k_6k/test_DALLE_label.csv",
# # "/eva_data0/denny/textual_inversion/60k_6k_6k/test_SGXL_label.csv",
# # "/eva_data0/denny/textual_inversion/60k_6k_6k/test_Control_COCO_label.csv",
# # "/eva_data0/iammingggg/textual_inversion/60k_6k_6k/test_lama_label.csv",
# # "/eva_data0/iammingggg/textual_inversion/60k_6k_6k/test_SD2IP_label.csv",
# # "/eva_data0/iammingggg/textual_inversion/60k_6k_6k/test_lte_label.csv",
# # "/eva_data0/iammingggg/textual_inversion/60k_6k_6k/test_SD2SR_label.csv",
# # "/eva_data0/iammingggg/textual_inversion/60k_6k_6k/test_deeperforensics_faceOnly_label.csv",
# # "/eva_data0/denny/textual_inversion/60k_6k_6k/test_AdvAtk_Imagenet_label.csv",
# # "/eva_data0/denny/textual_inversion/60k_6k_6k/test_Backdoor_Imagenet_label.csv",
# # "/eva_data0/denny/textual_inversion/60k_6k_6k/test_DataPoison_Imagenet_label.csv",
# ]

roots_and_labels = [
    ["/eva_data0/denny/textual_inversion/debug/0_real/", "yes"],
    ["/eva_data0/denny/textual_inversion/debug/0_real/", "yes"],
]

for root, label in roots_and_labels:
    instruct.LoadData(roots=root, labels=label)

    question = q1
    acc, confusion_mat, pretrained_ans_list, labels, label_3class = instruct.QueryImgs_batch(question=question, true_string="yes", logPath=logPath)
    print(f'Question: {question}')
    print(f'     Acc: {acc*100:.2f}%\n')

    # question = q2
    # acc, confusion_mat, finetuned_ans_list, labels, label_3class = instruct.QueryImgs_batch(question=question, true_string="yes", logPath=logPath)
    # print(f'Question: {question}')
    # print(f'Acc: {acc*100:.2f}%')

    # comb_acc, comb_confusion_mat, comb_ans = print_combine_result(pretrained_ans_list, finetuned_ans_list, labels, logPath=logPath)
    # print(f'[Combination]')
    # print(f'Acc: {comb_acc*100:.2f}%')

Loading model...
Log path: /home/denny/LAVIS/deepfake-detection/log/log.txt
Q1: Is this photo real?
Found 100 images with label "yes".


  0%|          | 0/13 [00:01<?, ?it/s]


In [None]:
path = "/eva_data0/denny/textual_inversion/debug/1_fake/common/00009.png"
image = Image.open(path)
ans = instruct.Query(image, q1)
print(ans)

In [3]:
class TextInvDataset(Dataset):
    def __init__(self, csv, vis_processors=None, txt_processors=None):
        
        self.path_and_labels = pd.read_csv(csv, index_col="img_path")
        self.vis_processors = vis_processors
        self.txt_processors = txt_processors

    def __len__(self):
        
        return len(list(self.path_and_labels.index))

    def __getitem__(self, index):

        image_path = list(self.path_and_labels.index)[index]
        image = Image.open(image_path).convert("RGB")
        if self.vis_processors:
            image = self.vis_processors(image)
        
        label = self.path_and_labels.loc[image_path, "label"]
        
        is_uncommon = "uncommon" in image_path

        return image, label, is_uncommon