In [None]:
import os
import json
from pathlib import Path
from kaggle.api.kaggle_api_extended import KaggleApi

# Get the current working directory
script_dir = os.getcwd()

# Define the new directory path relative to the current working directory
new_dir = os.path.join(script_dir, 'flickr8k_download')

# Create the new directory if it doesn't exist
os.makedirs(new_dir, exist_ok=True)

# Ensure the kaggle.json file is in the right place
kaggle_config_dir = os.path.expanduser('~/.kaggle')
os.environ['KAGGLE_CONFIG_DIR'] = kaggle_config_dir

# Check if the kaggle.json file exists
kaggle_json_path = os.path.join(kaggle_config_dir, 'kaggle.json')
if not os.path.exists(kaggle_json_path):
    raise FileNotFoundError(f"{kaggle_json_path} does not exist. Please place the kaggle.json file in the ~/.kaggle directory.")

# Check if the kaggle.json file is readable
if not os.access(kaggle_json_path, os.R_OK):
    raise PermissionError(f"{kaggle_json_path} is not readable. Please check the file permissions.")

# Read the kaggle.json file to ensure it's correct
try:
    with open(kaggle_json_path, 'r') as f:
        kaggle_config = json.load(f)
    assert 'username' in kaggle_config and 'key' in kaggle_config, "Invalid kaggle.json file format."
except Exception as e:
    raise ValueError(f"Error reading {kaggle_json_path}: {e}")

# Initialize the Kaggle API
api = KaggleApi()
try:
    api.authenticate()
    print("Successfully authenticated with the Kaggle API.")
except Exception as e:
    raise RuntimeError(f"Failed to authenticate with the Kaggle API: {e}")

# Define the dataset
dataset = 'adityajn105/flickr8k'
dataset_marker = os.path.join(new_dir, 'dataset_downloaded.marker')

# Check if the dataset has already been downloaded
if not os.path.exists(dataset_marker):
    try:
        # Download the dataset to the new directory
        api.dataset_download_files(dataset, path=new_dir, unzip=True)
        # Create a marker file to indicate the dataset has been downloaded
        Path(dataset_marker).touch()
        print(f'Dataset downloaded and extracted to {new_dir}')
    except Exception as e:
        raise RuntimeError(f"Failed to download the dataset: {e}")
else:
    print(f"Dataset already downloaded and extracted in {new_dir}")


In [None]:
import os
import pandas as pd
from PIL import Image
import matplotlib.pyplot as plt

# Set the number of images to display
num_images_to_display = 5  # Change this number to display more or fewer images

# Define the path to the downloaded dataset
dataset_dir = new_dir
images_dir = os.path.join(dataset_dir, 'Images')
captions_file = os.path.join(dataset_dir, 'captions.txt')

# Check if the images directory and captions file exist
if not os.path.exists(images_dir):
    raise FileNotFoundError(f"Images directory not found: {images_dir}")

if not os.path.exists(captions_file):
    raise FileNotFoundError(f"Captions file not found: {captions_file}")

# Read the captions file, skipping the first row if it's a header
try:
    captions = pd.read_csv(captions_file, delimiter=',', header=0, names=['image', 'caption'])
except Exception as e:
    raise ValueError(f"Error reading captions file: {e}")

# Display the first few rows of the captions
print("Captions DataFrame:")
print(captions.head())

# List and print the first few filenames in the images directory
image_files = os.listdir(images_dir)
print("First few image files in the images directory:")
print(image_files[:5])

# Function to display an image with its caption
def display_image_with_caption(image_file, caption):
    image_path = os.path.join(images_dir, image_file)
    if not os.path.exists(image_path):
        print(f"File not found: {image_path}")
        return
    image = Image.open(image_path)
    
    plt.figure(figsize=(8, 8))
    plt.imshow(image)
    plt.title(caption)
    plt.axis('off')
    plt.show()

# Display the specified number of images with their captions
for idx, row in captions.head(num_images_to_display).iterrows():
    display_image_with_caption(row['image'], row['caption'])


In [None]:
import os
import pandas as pd
from PIL import Image
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader, random_split
import torch
from torch.optim import AdamW
from torch.nn import CrossEntropyLoss
from transformers import CLIPProcessor, CLIPModel
from tqdm import tqdm
from peft import LoraConfig, get_peft_model

# Set up device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Load captions and skip the header
captions_file = 'flickr8k_download/captions.txt'

# Checking the first few lines to ensure correct delimiter
with open(captions_file, 'r') as file:
    for _ in range(5):
        print(file.readline())

# Load the captions data
captions = pd.read_csv(captions_file, delimiter=',', header=0, names=['image', 'caption'])

# Image directory
image_dir = 'flickr8k_download/Images'

# Preprocess images
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor()
])

def load_image(image_path):
    image = Image.open(image_path).convert('RGB')
    image = transform(image)
    return image

# Custom Dataset
class Flickr8kDataset(Dataset):
    def __init__(self, captions, image_dir, processor):
        self.captions = captions
        self.image_dir = image_dir
        self.processor = processor

    def __len__(self):
        return len(self.captions)

    def __getitem__(self, idx):
        img_name = os.path.join(self.image_dir, self.captions.iloc[idx, 0])
        image = load_image(img_name)
        caption = self.captions.iloc[idx, 1]
        return image, caption

# Initialize the model and processor
model_name = "openai/clip-vit-base-patch32"
model = CLIPModel.from_pretrained(model_name).to(device)
processor = CLIPProcessor.from_pretrained(model_name)

# Apply LoRA with updated target modules
config = LoraConfig(
    r=8,             # rank of the LoRA matrix
    lora_alpha=32,   # scaling factor for the LoRA update
    target_modules=["self_attn.k_proj", "self_attn.v_proj", "self_attn.q_proj", "self_attn.out_proj"],  # correct layer names
    lora_dropout=0.1,
)
model = get_peft_model(model, config)
model.to(device)

# Create dataset and dataloader
dataset = Flickr8kDataset(captions, image_dir, processor)
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

train_dataloader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=8, shuffle=False)

# Optimizer and Loss function
optimizer = AdamW(model.parameters(), lr=5e-5)
loss_fn = CrossEntropyLoss()

def train_one_epoch(model, dataloader, optimizer, loss_fn, device):
    model.train()
    total_loss = 0
    for images, captions in tqdm(dataloader):
        images = images.to(device)
        inputs = processor(text=captions, images=images, return_tensors="pt", padding=True).to(device)

        outputs = model(**inputs)
        logits_per_image = outputs.logits_per_image
        logits_per_text = outputs.logits_per_text

        ground_truth = torch.arange(len(images), device=device)

        loss = (loss_fn(logits_per_image, ground_truth) + loss_fn(logits_per_text, ground_truth)) / 2
        total_loss += loss.item()

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    return total_loss / len(dataloader)

def evaluate(model, dataloader, device):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for images, captions in dataloader:
            images = images.to(device)
            inputs = processor(text=captions, images=images, return_tensors="pt", padding=True).to(device)

            outputs = model(**inputs)
            logits_per_image = outputs.logits_per_image
            logits_per_text = outputs.logits_per_text

            ground_truth = torch.arange(len(images), device=device)

            loss = (loss_fn(logits_per_image, ground_truth) + loss_fn(logits_per_text, ground_truth)) / 2
            total_loss += loss.item()

    return total_loss / len(dataloader)

# Define the directory where the fine-tuned model will be saved
save_directory = 'ftm/fine-tuned-model'

# Training the model
num_epochs = 20

for epoch in range(num_epochs):
    print(f"Starting epoch {epoch+1}")
    train_loss = train_one_epoch(model, train_dataloader, optimizer, loss_fn, device)
    print(f"Epoch {epoch + 1}, Train Loss: {train_loss:.4f}")

# Save the fine-tuned model and processor
model.save_pretrained(save_directory)
processor.save_pretrained(save_directory)
print(f"Model and processor saved to {save_directory}")

# Evaluate the model
val_loss = evaluate(model, val_dataloader, device)
print(f"Validation Loss: {val_loss:.4f}")


In [None]:
from transformers import CLIPProcessor, CLIPModel
import torch
from PIL import Image
import requests
from peft import LoraConfig, get_peft_model, PeftModel
import matplotlib.pyplot as plt

# Define the directory where the fine-tuned model is saved
save_directory = 'ftm/fine-tuned-model'

# Load the base model and processor
model_name = "openai/clip-vit-base-patch32"
base_model = CLIPModel.from_pretrained(model_name)
processor = CLIPProcessor.from_pretrained(model_name)

# Apply LoRA with the same configuration used during training
config = LoraConfig(
    r=8,             # rank of the LoRA matrix
    lora_alpha=32,   # scaling factor for the LoRA update
    target_modules=["self_attn.k_proj", "self_attn.v_proj", "self_attn.q_proj", "self_attn.out_proj"],  # correct layer names
    lora_dropout=0.1,
)

# Wrap the base model with the PeftModel to apply LoRA
model = get_peft_model(base_model, config)

# Load the fine-tuned weights
model = PeftModel.from_pretrained(model, save_directory)

# Move model to device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
print(f"Model loaded on {device}")

# Example image URL
# url = "https://media.istockphoto.com/id/1500816306/photo/adult-black-male-admiring-the-streets-of-london-on-a-sunny-day-while-holding-a-smartphone-in.webp?s=2048x2048&w=is&k=20&c=0aHww9bA2AdPEUyPkbTiZBFM-ZNeSUY_oZ2nGWAorXI="
# url = "https://images.freeimages.com/images/large-previews/71f/my-new-bicycle-1431529.jpg"
# url = "https://images.freeimages.com/images/large-previews/f02/computer-room-1242684.jpg"
# url = "https://images.freeimages.com/images/large-previews/4ca/tree-1552037.jpg"
# url = "https://images.freeimages.com/images/large-previews/15b/lap-cat-1243719.jpg"
# url = "https://images.freeimages.com/images/large-previews/647/snowy-mountain-1378865.jpg"
# url = "https://images.freeimages.com/images/large-previews/792/captiol-building-1228390.jpg"
url = "https://images.freeimages.com/images/large-previews/429/plane-1449679.jpg"
response = requests.get(url, stream=True)
image = Image.open(response.raw)
image.show()

# Evaluate with different text prompts
texts = [
    "A photo of a cat",
    "A photo of a dog", 
    "A photo of a car", 
    "A photo of a building", 
    "A photo of a tree", 
    "A photo of a computer", 
    "A photo of a person", 
    "A photo of a landscape", 
    "A photo of a beach", 
    "A photo of a mountain", 
    "A photo of a city", 
    "A photo of a forest", 
    "A photo of a bicycle", 
    "A photo of a plane"
]
inputs = processor(text=texts, images=[image], return_tensors="pt", padding=True).to(device)
outputs = model(**inputs)
logits_per_image = outputs.logits_per_image
logits_per_text = outputs.logits_per_text
probs = logits_per_image.softmax(dim=1)
print("Probabilities for multiple texts:", probs)

# Plot the probabilities
probs = probs.cpu().detach().numpy().flatten()
plt.figure(figsize=(10, 6))
plt.barh(texts, probs, color='skyblue')
plt.xlabel('Probability')
plt.title('Model Confidence for Different Text Prompts')
plt.gca().invert_yaxis()
plt.show()
