In [None]:
!pip uninstall -y numpy transformers
!pip install numpy==1.24.4 --no-cache-dir --force-reinstall
!pip install git+https://github.com/huggingface/transformers.git
!pip install accelerate bitsandbytes xformers pandas openpyxl --upgrade
!pip install scikit-learn pandas


# Restart runtime to fully apply changes
import os
os.kill(os.getpid(), 9)

In [None]:
# Install HuggingFace Transformers from the latest GitHub repo
!pip install git+https://github.com/huggingface/transformers.git

In [None]:
!pip install jax --upgrade

In [None]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os
import pandas as pd
import numpy as np
from itertools import product
from sklearn.metrics.pairwise import cosine_similarity
import joblib
from PIL import Image
from transformers import AutoProcessor, LlavaForConditionalGeneration
import torch

# Paths
base_path = "/content/drive/MyDrive/fashion_dataset"
image_base_path = f"{base_path}/data/"
cluster_csv = f"{base_path}/image_clusters.csv"
excel_path = f"{base_path}/data.xlsx"
test_data = f"{base_path}/test_anchor_images.csv"
model_path = f"{base_path}/rf_regressor_model.pkl"



In [None]:
# Load LLaVA
processor = AutoProcessor.from_pretrained("llava-hf/llava-1.5-7b-hf")
llava_model = LlavaForConditionalGeneration.from_pretrained(
    "llava-hf/llava-1.5-7b-hf",
    torch_dtype=torch.float16,
    device_map="auto"
)


In [None]:
# Load data
df = pd.read_csv(cluster_csv)
df["filename"] = df["image_path"].apply(lambda x: os.path.basename(str(x).strip().replace("\\", "/")))
df_embed = df.set_index("filename")[[str(i) for i in range(512)]]
df_desc = pd.read_excel(excel_path)
desc_map = dict(zip(df_desc["main_image_url"].apply(lambda x: os.path.basename(str(x))), df_desc["description"]))
test_anchors = pd.read_csv(test_data)["test_anchors"].apply(lambda x: os.path.basename(str(x).strip().replace("\\", "/"))).tolist()
model = joblib.load(model_path)


In [None]:
def get_llava_score(image_path, description):
    try:
        prompt = (
          "<image>\n"
          f"USER: The main item description is: {description}.\n\n"
          "You are a fashion evaluator. First, look at the image and **list all the clothing items you see**, specifying their types (e.g., '1 t-shirt, 1 pair of jeans, 1 hoodie').\n\n"
          "Then determine whether the outfit contains at least one **top** and one **bottom**.\n\n"
          "Rules:\n"
          "- A valid outfit must include **at least one top and one bottom**.\n"
          "- If the outfit includes **only tops** or **only bottoms**, give it a **low score (1–3)**.\n"
          "- If the outfit includes both a top and a bottom and they match well in style and color, give a **higher score (8–10)**.\n"
          "- Use the full score range from 1 to 10.\n\n"
          "In your response, do the following:\n"
          "1. List the items you see(e.g., top, bottom, others).\n"
          "2. Explain your reasoning in 1–2 sentences.\n"
          "3. On a new line, write only: Score: X (e.g., Score: 7)\n\n"
          "ASSISTANT:"
      )


        image = Image.open(image_path).convert("RGB")
        inputs = processor(images=image, text=prompt, return_tensors="pt")

        inputs["pixel_values"] = inputs["pixel_values"].to("cuda", torch.float16)
        inputs["input_ids"] = inputs["input_ids"].to("cuda")
        inputs["attention_mask"] = inputs["attention_mask"].to("cuda")

        output = llava_model.generate(**inputs, max_new_tokens=300)
        result = processor.decode(output[0], skip_special_tokens=True)



        import re
        matches = re.findall(r"(?:score\s*(?:of|is|:)?|rated)\s*(\d+)", result, re.IGNORECASE)
        if matches:
            score = int(matches[-1])  # use the last one
            return min(max(score, 1), 10)
        else:
            print(f"Couldn't extract score from response: {result}")
            return None


    except Exception as e:
        print(f"Error with LLaVA for {image_path}: {e}")
        return None


In [None]:
# Build recommendations
cluster_dict = df.groupby("cluster")["filename"].apply(list).to_dict()
image_path_dict = dict(zip(df["filename"], df["image_path"]))

TOP_K = 3
recommendations = []

for anchor in test_anchors:
    print(f"Processing anchor: {anchor}")
    if anchor not in df_embed.index:
        print(f"Missing embedding for anchor: {anchor}")
        continue

    anchor_row = df[df["filename"] == anchor].iloc[0]
    anchor_cluster = anchor_row["cluster"]
    anchor_vec = df_embed.loc[anchor].values.reshape(1, -1)

    top_k_per_cluster = {}
    for c in cluster_dict:
        if c == anchor_cluster:
            continue
        candidates = cluster_dict[c]
        valid = [f for f in candidates if f in df_embed.index]
        if not valid:
            continue
        vecs = np.stack([df_embed.loc[f] for f in valid])
        sims = cosine_similarity(anchor_vec, vecs)[0]
        idx = np.argsort(sims)[::-1][:TOP_K]
        top_k_per_cluster[c] = [valid[i] for i in idx]

    if len(top_k_per_cluster) < 2:
        print(f"Skipping {anchor} — not enough clusters.")
        continue

    combos = list(product(*top_k_per_cluster.values()))
    best_score = -1
    best_outfit = None

    for combo in combos:
        try:
            vecs = [df_embed.loc[anchor]] + [df_embed.loc[x] for x in combo]
            full_vec = np.concatenate(vecs).reshape(1, -1)
            score = model.predict(full_vec)[0]
            if score > best_score:
                best_score = score
                best_outfit = [anchor] + list(combo)
        except Exception as e:
            print(f"Error scoring combo: {e}")
            continue

    if best_outfit:
        # get description for anchor
        anchor_file = os.path.basename(image_path_dict[best_outfit[0]].replace("\\", "/"))
        description = desc_map.get(anchor_file, "")

        print(f"Looking for description with key: {anchor_file}")
        print(f"Description: {description}")

        if not description or not isinstance(description, str) or description.strip() == "":
            print(f"No description for {anchor_file}, skipping...")
            continue

        from PIL import Image

        # fix path formatting for Colab
        relative = image_path_dict[best_outfit[0]].replace("\\", "/")
        image_path = os.path.join(base_path, relative)

        try:
            img = Image.open(image_path)
            img.verify()
            print(f"Image at {image_path} is valid.")
        except Exception as e:
            print(f"Image open failed: {image_path} | Error: {e}")
            continue




        description = desc_map.get(anchor_file, "")

        # score with LLaVA

        llava_val = get_llava_score(image_path, description)
        # Show final score you're saving
        print(f"Final score to save for {anchor}: {llava_val}")
        recommendations.append({
            "anchor": image_path_dict[best_outfit[0]],
            "match_1": image_path_dict[best_outfit[1]],
            "match_2": image_path_dict[best_outfit[2]],
            "model_score": round(best_score * 10, 2),
            "llava_score": llava_val
        })


In [None]:
# Save results
df_result = pd.DataFrame(recommendations)
df_result.to_csv("llava_similarity_model_selected.csv", index=False)
print("Saved to llava_similarity_model_selected.csv")
if len(df_result):
    print(f"Average LLaVA Score: {np.mean([r for r in df_result['llava_score'] if pd.notnull(r)]):.2f}")

In [None]:
from sklearn.metrics import mean_squared_error
import pandas as pd

# Replace these with the actual score column names if different
input_scores = df_result ["model_score"]
anchor_scores = df_result ["llava_score"]

# Calculate MSE
mse = mean_squared_error(input_scores, anchor_scores)
print(f"Mean Squared Error (MSE) between ml model and llava scores: {mse:.4f}")