In [None]:
import torch

def load_embeddings(cache_file, dataset):
    """
    Loads the embeddings from a file
    """
    save_dict = torch.load(cache_file)
    train_features, train_labels, train_groups, train_domains, train_filenames = save_dict['train_features'], save_dict['train_labels'], save_dict['train_groups'], save_dict['train_domains'], save_dict['train_filenames']
    val_features, val_labels, val_groups, val_domains, val_filenames = save_dict['val_features'], save_dict['val_labels'], save_dict['val_groups'], save_dict['val_domains'], save_dict['val_filenames']
    test_features, test_labels, test_groups, test_domains, test_filenames = save_dict['test_features'], save_dict['test_labels'], save_dict['test_groups'], save_dict['test_domains'], save_dict['test_filenames']
    return train_features, train_labels, train_groups, train_domains, train_filenames, val_features, val_labels, val_groups, val_domains, val_filenames, test_features, test_labels, test_groups, test_domains, test_filenames


In [None]:
cache_file = "embeddings/CUB/clip_openai_ViT-L_14.pt"
dataset = "CUB"

In [None]:
train_features, train_labels, train_groups, train_domains, train_filenames, val_features, val_labels, val_groups, val_domains, val_filenames, test_features, test_labels, test_groups, test_domains, test_filenames = load_embeddings(cache_file, dataset)

save_dict = torch.load(cache_file)
test_features_short = test_features[32:]
test_domains_short = test_domains[32:]
print("shape of test_features:",test_features.shape)
# print(test_features)
# print(test_labels)
print("len of test labels:",len(test_labels))
print(test_labels)
print(test_groups)
print(test_domains)
#print(test_filenames)
# for item in test_filenames:
#     print(item)
num_zeros = list(test_domains).count(0)
num_ones = list(test_domains).count(1)
print("Number of ones:", num_ones)
print("Number of zeros:", num_zeros)

In [None]:
import numpy as np

def test_process(test_val,few_target_domain_index):
    target_domain_val,source_domain_val = test_val[:3047],test_val[3047:]
    few_target_domain_val = np.take(target_domain_val, few_target_domain_index, axis=0)
    new_target_domain_val = np.delete(target_domain_val, few_target_domain_index, axis=0)
    new_test_val = np.concatenate((new_target_domain_val,source_domain_val),axis=0)
    return few_target_domain_val,new_test_val

def get_few_target_domain_index(test_labels):
    test_labels = test_labels[:3047]
    num_classes = len(set(test_labels))
    few_target_domain_index = []
    for i in range(num_classes):
        class_index = np.where(test_labels == i)[0]
        if len(class_index) > 0:
            few_target_domain_index.append(class_index[0])
    return few_target_domain_index

few_target_domain_index = get_few_target_domain_index(test_labels)
print(few_target_domain_index)
few_target_test_features,_ = test_process(test_features,few_target_domain_index)
print(few_target_test_features[199])


In [None]:
import os
import glob

folder_path = './data/CUB-200-Painting'
total_count = 0
category_counts = {}

category_with_10plus_count = 0

for category_folder in glob.glob(os.path.join(folder_path, '*')):
    # 确保当前文件路径是一个文件夹
    if not os.path.isdir(category_folder):
        continue
    
    category_name = os.path.basename(category_folder)
    category_path = os.path.join(folder_path, category_name)
    
    image_count = len(glob.glob(os.path.join(category_path, '*.jpg')))
    
    total_count += image_count
    category_counts[category_name] = image_count
    
    # 统计大于5张照片的类别文件夹个数
    if image_count > 5:
        category_with_10plus_count += 1

print("总图片数量:", total_count)
print("每个类别文件夹的图片数量:")
for category_name, image_count in category_counts.items():
    print(category_name, ":", image_count)

print("大于5张照片的类别文件夹个数:", category_with_10plus_count)

In [None]:
import os
import shutil

source_folder = "./data/CUB-200-Painting"
target_folder = "./data/CUB-Few-Painting"

os.makedirs(target_folder, exist_ok=True)

subfolders = [f.path for f in os.scandir(source_folder) if f.is_dir()]

for subfolder in subfolders:
    folder_name = os.path.basename(subfolder)
    
    target_subfolder = os.path.join(target_folder, folder_name)
    os.makedirs(target_subfolder, exist_ok=True)
    
    image_files = [f.path for f in os.scandir(subfolder) if f.is_file() and f.name.endswith(".jpg")]
    
    if image_files:
        first_image = sorted(image_files)[0]
        target_image = os.path.join(target_subfolder, os.path.basename(first_image))
        shutil.copy(first_image, target_image)

print("success")

In [None]:
import torchvision
from PIL import Image

def luma_transform(input):
    assert input.shape[0] == 3, "Input must have 3 channels"
    R, G, B = input[0, :, :], input[1, :, :], input[2, :, :]
    L = R * 0.299 + G * 0.587 + B * 0.114
    L = L.unsqueeze(0).expand(3, -1, -1)
    return L

class Cub2011Painting(torchvision.datasets.ImageFolder):

    def __getitem__(self, idx):
        path, target = self.samples[idx]

        img =  Image.open(path)

        img = img.resize((512,512), Image.ANTIALIAS)
        img = np.array(img) / 255
        img = torch.from_numpy(img).type(torch.FloatTensor).permute(2,0,1)
        print(img.shape)
        img = (img - 0.5) * 2

        img = luma_transform(img)
        print(img.shape)

        return {
            "image": img,
            "label": target,
            "filename": path,
        }

In [None]:
import torch
from tqdm import tqdm
import numpy as np

# Load the model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# construct data loader
testset = Cub2011Painting('./data/CUB-Few-Painting', transform=None)
test_loader = torch.utils.data.DataLoader(testset, batch_size=1, shuffle=False)

with torch.no_grad():
    for step,batch in enumerate(test_loader):
        #print(step)
        image,label,filename = batch['image'],batch['label'],batch['filename']
        print(image.shape)
        break



## 2023-11-12 test calcuate gray image embedding

In [None]:
import os
import torch
import clip
import numpy as np
import random
from datasets.cub import Cub2011Painting
from clip_utils import get_features

# seed
torch.manual_seed(0)
np.random.seed(0)
random.seed(0)

# Load the model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
clip_model, preprocess = clip.load("ViT-L/14", device)
model, preprocess = clip.load("ViT-L/14", device)

# construct data loader
dataset = Cub2011Painting('./data/CUB-RemovedStyle-Painting', transform=preprocess)
loader = torch.utils.data.DataLoader(dataset, batch_size=256, shuffle=False)

# with torch.no_grad():
#     for step,batch in enumerate(loader):
#         #print(step)
#         print(batch['label'], batch['group'], batch['domain'], batch['filename'])
#         break

features, labels, groups, domains, filenames = get_features(loader, model, device, model_type='clip')

save_dict = {
        "removed_style_features": features, "removed_style_labels": labels, "removed_style_groups": groups, "removed_style_domains": domains, "removed_style_filenames": filenames,
        "seed": 0
    }

cache_file = './embeddings/CUB/clip_openai_Vit-L_14_remove_style.pt'
torch.save(save_dict, cache_file)
print(f"Saved CLIP embeddings to {cache_file}")



In [None]:
save_dict = torch.load(cache_file)
print(save_dict['removed_style_features'][199])

## test L1 loss

In [None]:
import torch
import torch.nn as nn

# 创建一个L1Loss对象
l1_loss = nn.L1Loss()

# 创建两个张量作为示例输入
input1 = torch.tensor([1, 2, 3], dtype=torch.float32)
input2 = torch.tensor([2, 4, 6], dtype=torch.float32)

# 计算L1损失
loss = l1_loss(input1, input2)

print(loss)  # 输出损失值

## Test Optuna

In [None]:
import optuna

def objective(trial):
    x = trial.suggest_uniform('x', -10, 10)
    y = trial.suggest_uniform('y', -10, 10)
    return (x + y) ** 2

study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=100)

print(study.best_params)
print(study.best_value)

## Test BLIP2 Caption

In [None]:
import requests
from PIL import Image

image = Image.open("./data/CUB-200-Painting/004.Groove_billed_Ani/Groove_billed_Ani_0.jpg").convert('RGB')
# url = 'https://media.newyorker.com/cartoons/63dc6847be24a6a76d90eb99/master/w_1160,c_limit/230213_a26611_838.jpg'
# image = Image.open(requests.get(url, stream=True).raw).convert('RGB')  
display(image.resize((596, 437)))

In [None]:
from transformers import AutoProcessor, Blip2ForConditionalGeneration
import torch

processor = AutoProcessor.from_pretrained("./blip2-opt-2.7b")
model = Blip2ForConditionalGeneration.from_pretrained("./blip2-opt-2.7b", torch_dtype=torch.float16)

print("success")

In [None]:
device = "cuda:3" if torch.cuda.is_available() else "cpu"
model.to(device)

prompt = "a painting of"

inputs = processor(image, text=prompt, return_tensors="pt").to(device, torch.float16)

generated_ids = model.generate(**inputs, max_new_tokens=100)
generated_text = processor.batch_decode(generated_ids, skip_special_tokens=True)[0].strip()
print(prompt +" "+generated_text)

In [None]:
import os
import csv
import requests
from PIL import Image
from transformers import AutoProcessor, Blip2ForConditionalGeneration
import torch

# Initialize the Blip2 model and processor
processor = AutoProcessor.from_pretrained("./blip2-opt-2.7b")
model = Blip2ForConditionalGeneration.from_pretrained("./blip2-opt-2.7b", torch_dtype=torch.float16)

device = "cuda:3" if torch.cuda.is_available() else "cpu"
model.to(device)

# Define the prompt
prompt = "a painting of"

# List of image paths
image_paths = []

directory = "./data/CUB-200-Painting"

for root, dirs, files in os.walk(directory):
    for file in files:
        if file.endswith(".jpg") or file.endswith(".png"):
            image_path = os.path.join(root, file)
            image_paths.append(image_path)

image_paths = sorted(image_paths)

# CSV file path to store the results
csv_file = "./data/CUB-200-Painting/image_captions.csv"

# Create a list to store the image paths and captions
image_captions = []

# Process each image and generate captions
for image_path in image_paths:
    image = Image.open(image_path).convert('RGB')
    inputs = processor(image, text=prompt, return_tensors="pt").to(device, torch.float16)
    generated_ids = model.generate(**inputs, max_new_tokens=100)
    generated_text = processor.batch_decode(generated_ids, skip_special_tokens=True)[0].strip()
    generated_text = prompt + " " + generated_text
    image_captions.append([image_path, generated_text])
    print("sucess!")

# Write the image paths and captions to the CSV file
with open(csv_file, mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(['Image Path', 'Caption'])
    writer.writerows(image_captions)

print("Image captions generated and saved to", csv_file)