In [None]:
""" 
Fine tune a image captioning or description model on a artwork description data to
obtain a model that is able to analyze an artwork and generate a description of 
the artwork that is hopefully more art-review-like than a generic image captioning 
model would

The pretrained model used here is the Salesforce Blip Captioniong model: 
https://huggingface.co/Salesforce/blip-image-captioning-base

The fine tuning data set is the data from the SemArt Project: 
https://github.com/noagarcia/SemArt
"""

In [None]:
import os
import torch
import pandas as pd
import chardet
from PIL import Image
from torchvision import transforms
from datasets import Dataset, DatasetDict
import pandas as pd
from PIL import Image
from torch.utils.data import Dataset, DataLoader
from datasets import Dataset as HFDataset, DatasetDict, IterableDataset
from transformers import BlipProcessor, BlipForConditionalGeneration, ViTImageProcessor, VisionEncoderDecoderModel, AutoTokenizer, TrainingArguments, Trainer

In [None]:
def list_files_in_directory(dir_path):
    try:
        files = os.listdir(dir_path)
        return [x for x in files if '.jpg' in x]
    except FileNotFoundError:
        return f"Directory not found: {dir_path}"
    except NotADirectoryError:
        return f"Not a directory: {dir_path}"

In [None]:
import pandas as pd
import torch
from torchvision import transforms
from PIL import Image
import os

# Step 1: Load the CSV file as a dictionary mapping image names to descriptions
semart_dir = f"/Users/rckyi/Documents/Datasette/SemArtData/SemArt" 
images_dir = semart_dir 

description_file_train = semart_dir + '/semart_train.csv'
description_file_test = semart_dir + '/semart_test.csv'

with open(description_file_train, 'rb') as file:
    print(f'file path {description_file_train}')
    result = chardet.detect(file.read())
    encoding = result['encoding']
    print(f'encoding {encoding}')
    df_train = pd.read_csv(description_file_train, encoding=encoding, sep='\t')
    print(type(df_train['IMAGE_FILE'][0]))
# df_test = pd.read_csv(description_file_test, encoding = "utf-8")


image_to_description = dict(zip(df_train["IMAGE_FILE"], df_train["DESCRIPTION"]))
# list(image_to_description.items())[0:5]

# # Define the image directory and transformation
# image_dir = "path/to/images"
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize to a fixed shape
    transforms.ToTensor(),  # Convert image to tensor
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])  # Normalize
])

# Step 2: Define a generator to load images and descriptions lazily
# def data_generator(images_name_list):
#     for image_name in images_name_list:
#         image_path = os.path.join(image_dir, image_name)

#         if image_name in image_to_description and os.path.exists(image_path):
#             # Load and transform the image
#             image = Image.open(image_path).convert("RGB")
#             image_tensor = transform(image)

#             # Yield image name, image tensor, and description
#             yield image_name, image_tensor, image_to_description[image_name]

# # Step 3: Iterate through the generator and build the dictionary
# data_var = {img_name: [img_tensor, desc] for img_name, img_tensor, desc in data_generator(["image1.jpg", "image2.jpg", "image3.jpg"])}

# # Print a sample entry
# print(next(iter(data_var.items())))

In [None]:
# # Step 1: Load the CSV file into a dictionary mapping image names to descriptions
# description_file = "path/to/description.csv"
# df = pd.read_csv(description_file)
image_files_in_dir = list_files_in_directory(semart_dir+'/Images/')
# image_files_in_dir
# list(image_to_description.items())[0]

In [None]:
def data_generator(images_name_list):
    image_file_list = list(df_train['IMAGE_FILE'])
    for image_name in images_name_list:
#         image_path = os.path.join(images_dir, image_name)
        image_path = f"{semart_dir}/Images/{image_name}"

        if image_name in image_file_list: #image_to_description: # and os.path.exists(image_path):
            image = Image.open(image_path).convert("RGB")
            image_tensor = transform(image)
            yield image_name, image_tensor, image_to_description[image_name]

# Step 3: Iterate through the generator and build the dictionary
data_var = {img_name: [img_tensor, desc] for img_name, img_tensor, desc in data_generator(image_files_in_dir)}

# # Print a sample entry


In [None]:
# for i, b in data_var.items():
#     print(f'i {i}, b {b}')

In [None]:
from datasets import IterableDataset, DatasetDict
from transformers import BlipProcessor, BlipForConditionalGeneration, TrainingArguments, Trainer
import torch

def convert_dict_to_hf_dataset(data_dict):
    """
    Converts a dictionary {img_name: [img_tensor, desc]} into Hugging Face Dataset format.
    Uses a generator for efficient streaming.
    """
    def data_generator():
        for _, (img_tensor, desc) in data_dict.items():  # Ignore img_name
            yield {"image": img_tensor, "text": desc}
    
    # Use IterableDataset.from_generator() for streaming support
    hf_dataset = IterableDataset.from_generator(data_generator)
    return DatasetDict({"train": hf_dataset})


In [None]:

# Convert dictionary to Hugging Face Dataset format
hf_dataset = convert_dict_to_hf_dataset(data_var)



In [None]:
# Define Training Arguments
training_args = TrainingArguments(
    output_dir="./blip-finetuned",
    per_device_train_batch_size=8,
    num_train_epochs=5,
    save_steps=500,
    evaluation_strategy=None,
#     evaluation_strategy="steps",
    save_total_limit=2,
    learning_rate=5e-5,
    weight_decay=0.01,
    logging_dir="./logs",
    push_to_hub=False,  # Set to True if uploading to Hugging Face Hub
    logging_steps=10,
    fp16=torch.cuda.is_available(),
    max_steps=10,
    no_cuda=True
)



In [None]:
# Load Pretrained BLIP Model & Processor
model_name = "Salesforce/blip-image-captioning-base"
processor = BlipProcessor.from_pretrained(model_name)
model = BlipForConditionalGeneration.from_pretrained(model_name)

In [None]:
# Define Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=hf_dataset["train"],
    tokenizer=processor,
)


In [None]:
# Start Fine-Tuning
trainer.train()


In [None]:
# # Convert the DataFrame into a dictionary {image_name: description}
# image_to_description = dict(zip(df_train["IMAGE_FILE"], df_train["DESCRIPTION"]))

# # Step 2: Iterate through the list of image names and get corresponding descriptions
# # images_name_list = ["image1.jpg", "image2.jpg", "image3.jpg"]  # Example list of image names

# Define a transformation to convert images to PyTorch tensors
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize to a fixed shape
    transforms.ToTensor(),  # Convert image to tensor
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])  # Normalize
])

# # Step 3 & 4: Load each image as a tensor and create the dictionary data_var
# data_var = {}

# for image_name in image_files_in_dir:
#     image_path = f"{semart_dir}/Images/{image_name}"  # Update with the correct path

#     if image_name in image_to_description:  # Ensure image exists in CSV
#         # Load and transform the image
#         image = Image.open(image_path).convert("RGB")
#         image_tensor = transform(image)

#         # Get the corresponding description
#         description = image_to_description[image_name]

#         # Store in dictionary
#         data_var[image_name] = [image_tensor, description]

# # Now, `data_var` contains image tensors mapped to their descriptions
# print(data_var)


In [None]:
# Step 1: Define Custom PyTorch Dataset
# class ImageDescriptionDataset(Dataset):
#     def __init__(self, image_names, image_dir, transform):
#         self.image_names = image_names
#         self.image_dir = image_dir
#         self.transform = transform

#     def __len__(self):
#         return len(self.image_names)

#     def __getitem__(self, idx):
#         image_name = self.image_names[idx]
#         image_path = os.path.join(self.image_dir, image_name)

#         if os.path.exists(image_path) and image_name in image_to_description:
#             # Load & transform image
#             image = Image.open(image_path).convert("RGB")
#             image_tensor = self.transform(image)

#             # Get text description
#             description = image_to_description[image_name]

#             return {"image": image_tensor, "text": description}
#         else:
#             return {"image": torch.zeros((3, 224, 224)), "text": ""}  # Return empty if missing

In [None]:
class ImageDescriptionDataset(Dataset):
    def __init__(self, image_names, image_dir, transform):
        self.image_names = image_names
        self.image_dir = image_dir
        self.transform = transform

    def __len__(self):
        return len(self.image_names)

    def __getitem__(self, idx):
        image_name = self.image_names[idx]
        image_path = os.path.join(self.image_dir, image_name)

        if os.path.exists(image_path) and image_name in image_to_description:
            image = Image.open(image_path).convert("RGB")
            image_tensor = self.transform(image)
            description = image_to_description[image_name]
            return {"image": image_tensor, "text": description}
        else:
            return {"image": torch.zeros((3, 384, 384)), "text": ""}

In [None]:
# Step 2: Load Data with DataLoader (Streaming)


# images_name_list = df_train["IMAGE_FILE"].tolist()  # Get list of image names from CSV
# dataset_train = ImageDescriptionDataset(image_files_in_dir, images_dir, transform)

# # Wrap in DataLoader for efficient streaming
# data_loader_train = DataLoader(dataset_train, batch_size=8, shuffle=True, num_workers=2)

In [None]:
# Step 3: Convert DataLoader to Hugging Face Dataset
# def convert_to_hf_format(dataloader):
#     """Convert PyTorch DataLoader output into Hugging Face Dataset format"""
#     image_list, text_list = [], []
    
#     for batch in dataloader:
#         image_list.extend(batch["image"])
#         text_list.extend(batch["text"])

#     # Convert to Hugging Face Dataset
#     return HFDataset.from_dict({"image": image_list, "text": text_list})

# hf_dataset = convert_to_hf_format(data_loader_train)

In [None]:
# Step 3: Convert DataLoader to Hugging Face Streaming Dataset
def convert_to_hf_format(dataloader):
    """Generator-based function to convert DataLoader output into a Hugging Face streaming dataset."""
    
    def data_generator():
        for batch in dataloader:
            for image_tensor, text in zip(batch["image"], batch["text"]):
                yield {"image": image_tensor, "text": text}

    # Create an IterableDataset for Hugging Face Trainer
    return IterableDataset.from_generator(data_generator)

# Convert PyTorch DataLoader to Hugging Face Streaming Dataset
hf_dataset_train = convert_to_hf_format(data_loader_train)

# Wrap in DatasetDict for Hugging Face Trainer
dataset_dict = DatasetDict({"train": hf_dataset_train})

In [None]:
# # Step 4: Load Pretrained BLIP Model & Processor
# model_name = "Salesforce/blip-image-captioning-base"
# processor = BlipProcessor.from_pretrained(model_name)
# model = BlipForConditionalGeneration.from_pretrained(model_name)

In [None]:
model_name = "ydshieh/vit-gpt2-coco-en"
model = VisionEncoderDecoderModel.from_pretrained(model_name)
processor = ViTImageProcessor.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)

In [None]:
tokenizer = AutoTokenizer.from_pretrained(model_name)

In [None]:
# if tokenizer.pad_token is None:
#     tokenizer.add_special_tokens({'pad_token': '[PAD]'})
#     model.resize_token_embeddings(len(tokenizer))

In [None]:
# Step 5: Define Training Arguments
training_args = TrainingArguments(
    output_dir="./blip-finetuned-semart",
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=5,
    save_strategy="epoch",
#     save_steps=500,
    evaluation_strategy=None,
#     evaluation_strategy="steps",
    eval_steps=500,
    learning_rate=5e-5,
    weight_decay=0.01,
    logging_dir="./logs",
    logging_steps=1,
    save_total_limit=2,
    report_to="none",
    fp16=torch.cuda.is_available(),
    max_steps=5,
#     no_cuda=True
)

# training_args = TrainingArguments(
#     output_dir="./scienceqa_finetuned_lora",
#     evaluation_strategy="epoch",
#     save_strategy="epoch",
#     per_device_train_batch_size=8,
#     per_device_eval_batch_size=8,
#     learning_rate=3e-5,
#     num_train_epochs=3,
#     weight_decay=0.01,
#     save_total_limit=2,
#     logging_dir="./logs",
#     logging_steps=10,
#     fp16=torch.cuda.is_available(),
#     max_steps=1000,
#     no_cuda=True
# )


In [None]:
# Step 6: Fine-tune the Model with Hugging Face Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=dataset_dict["train"],
#     eval_dataset=dataset_dict["train"],
    tokenizer=processor
)

In [None]:
trainer.train()

In [None]:
# Save Fine-tuned Model
model.save_pretrained("./blip-finetuned-semart")
processor.save_pretrained("./blip-finetuned-semart")

In [None]:
# Step 5: Set training configurations
# training_args = TrainingArguments(
#     output_dir="./blip-finetuned-semart",
#     per_device_train_batch_size=8,
#     num_train_epochs=5,
#     save_steps=100,
#     save_total_limit=2,
#     logging_dir="./logs",
#     logging_steps=50,
#     evaluation_strategy="steps",
#     eval_steps=500,
#     learning_rate=5e-5,
#     weight_decay=0.01,
#     fp16=True,
#     report_to="none"
# )

In [None]:
# Step 6: Fine-tune the BLIP model
# trainer = Trainer(
#     model=model,
#     args=training_args,
#     train_dataset=hf_dataset,
#     tokenizer=processor
# )

# trainer.train()

# # Save the fine-tuned model
# model.save_pretrained("./blip-finetuned-semart")
# processor.save_pretrained("./blip-finetuned-semart")