<a href="https://colab.research.google.com/github/kunalsonalkar/Github-Intro/blob/master/code/modules/02_LabelClassification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Load the ground truth dataset**

In [2]:
import pandas as pd

ground_truth_df = pd.read_excel('/content/full_batch01_groundtruth_20k.xlsx')

**Load the images**

In [3]:
from google.colab import drive
drive.mount('/content/drive')

import zipfile
import os

# Specify the path to your zip file
zip_file_path = '/content/drive/MyDrive/images.zip'  # Adjust this path
extract_folder = '/content/images'  # Folder where you want to extract images

# Create a directory for extracted images if it doesn't exist
os.makedirs(extract_folder, exist_ok=True)

# Unzip the file
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall(extract_folder)

Mounted at /content/drive


**Sample the data. Use the Random Seed for replicable results**

In [4]:
import pandas as pd

product_types = ground_truth_df['product_category'].unique()
total_samples = 2000
num_product_types = len(product_types)

# Calculate the number of samples per product type
samples_per_type = total_samples // num_product_types
remainder = total_samples % num_product_types

# Set a random seed for reproducibility
RANDOM_SEED = 42

# Sample the DataFrame
sample_df = pd.DataFrame()
for i, product_type in enumerate(product_types):
    product_df = ground_truth_df[ground_truth_df['product_category'] == product_type]
    sample_size = samples_per_type + (1 if i < remainder else 0)
    sample_df = pd.concat([
        sample_df,
        product_df.sample(
            n=min(sample_size, len(product_df)),
            random_state=RANDOM_SEED  # Ensures replicability
        )
    ], ignore_index=True)

In [5]:
import os
import shutil

# Define the source and destination directories
source_dir = '/content/images/images'
destination_dir = '/content/images/sample_images'

# Ensure the destination directory exists
if not os.path.exists(destination_dir):
    os.makedirs(destination_dir)

# Move the images
for filename in sample_df['filename']:
    source_path = os.path.join(source_dir, filename)
    destination_path = os.path.join(destination_dir, filename)

    # Check if the file exists
    if os.path.isfile(source_path):
        try:
            # Move the file to the destination directory
            shutil.move(source_path, destination_path)
            #print(f"Moved {filename} to {destination_dir}")
        except Exception as e:
            print(f"Failed to move {filename}: {e}")

**Load the prompt - Uplaoded in the Repo**

In [6]:
with open('/content/combined_prompt.txt', 'r') as file:
    prompt_content = file.read().strip()

In [7]:
import requests
import json
from PIL import Image
from io import BytesIO
from google.colab import userdata
import base64



**Encode Image and VLM Response**

In [8]:
def encode_image(image_path, max_image=512):
  with Image.open(image_path) as img:
    width, height = img.size
    max_dim = max(width, height)
    if max_dim > max_image:
      scale_factor = max_image / max_dim
      new_width = int(width * scale_factor)
      new_height = int(height * scale_factor)
      img = img.resize((new_width, new_height))
    buffered = BytesIO()
    img.save(buffered, format="PNG")
    img_str = base64.b64encode(buffered.getvalue()).decode('utf-8')

    return img_str

**Response from OpenRouter**

In [9]:
def get_vlm_response(text_prompt, img_encoded_string, model_name):
  response = requests.post(
      url="https://openrouter.ai/api/v1/chat/completions",
      headers={
          "Authorization": f"Bearer {userdata.get('OPENROUTER_API_KEY')}",
          "HTTP-Referer": "",
          "X-Title": "",
      },
      data=json.dumps({
          "model": model_name,  # Now using parameter value
          "messages": [
              {
                  "role": "user",
                  "content": [
                      {
                          "type": "text",
                          "text": text_prompt
                      },
                      {
                          "type": "image_url",
                          "image_url": {"url": f"data:image/jpeg;base64,{img_encoded_string}"}
                      }
                  ]
              }
          ]
      })
  )
  return response

In [11]:
def process_image_folder_to_dataframe(folder_path, text_prompt, model_name, image_range=(0, 10), max_image_size=512):
    """
    Process a specified range of images in a folder and get VLM responses, returning results in a DataFrame.

    Args:
        folder_path: Path to directory containing images
        text_prompt: Question/prompt for the vision model
        image_range: Tuple specifying the start and end indices for slicing (default: (0, 10))
        max_image_size: Maximum dimension for resizing (default: 512)
        model_name: VLM model identifier (default: 'google/gemini-2.5-pro-exp-03-25:free')

    Returns:
        Pandas DataFrame with columns 'Image Name' and 'Model Response'
    """
    responses = []
    valid_extensions = {'.png', '.jpg', '.jpeg', '.webp', '.gif'}

    # Get sorted list of image files in the folder
    all_files = sorted([f for f in os.listdir(folder_path) if os.path.splitext(f)[1].lower() in valid_extensions])

    # Apply slicing to select the desired range of files
    selected_files = all_files[image_range[0]:image_range[1]]

    for filename in selected_files:
        file_path = os.path.join(folder_path, filename)

        try:
            img_str = encode_image(file_path, max_image_size)
            response = get_vlm_response(text_prompt, img_str, model_name)
            responses.append({"image_file": filename, "predictions": response.json()})

        except Exception as e:
            responses.append({"image_file": filename, "predictions": f"Error processing {filename}: {str(e)}"})

    # Convert list of dictionaries to DataFrame
    return pd.DataFrame(responses)

In [20]:
import pandas as pd

results_df = pd.DataFrame()

#total_images = len([entry for entry in os.listdir(sample_folder) if os.path.isfile(os.path.join(sample_folder, entry))])  # images in sample folder
total_images = 50
for i in range(1, total_images + 1, 100):
    start = i
    end = min(i + 99, total_images)
    batch_df = process_image_folder_to_dataframe(
        "/content/images/sample_images",
        prompt_content,
        model_name="google/gemini-2.0-flash-001",
        image_range=(start, end)
    )
    results_df = pd.concat([results_df, batch_df], ignore_index=True)

**Model Responses & Evaluation**

In [23]:
import json

results_df['model_response'] = results_df['predictions'].apply(
    lambda x: x['choices'][0]['message']['content'] if isinstance(x, dict) and 'choices' in x else None
)

In [24]:
import re
import pandas as pd

def extract_labels(labels):
    # Check if labels is None, return default values if so
    if labels is None:
        return pd.Series({
            "Shape Pattern Label": None,
            "Color Pattern Label": None,
            "Fabric Type Label": None
        })

    # Find all occurrences of patterns enclosed in square brackets
    patterns = re.findall(r'\[(.*?)\]', labels)

    # Clean up the extracted patterns
    def clean_pattern(s):
        return s.strip('[] ').replace(' ', '')

    # Initialize default values
    shape_pattern = color_pattern = fabric_type = None

    # Extract patterns if available
    if len(patterns) >= 1:
        shape_pattern = clean_pattern(patterns[0])
    if len(patterns) >= 2:
        color_pattern = clean_pattern(patterns[1])
    if len(patterns) >= 3:
        fabric_type = clean_pattern(patterns[2])

    return pd.Series({
        "Shape Pattern Label": shape_pattern,
        "Color Pattern Label": color_pattern,
        "Fabric Type Label": fabric_type
    })

# Apply the function while handling NoneType inputs
results_df[["Shape Pattern Label", "Color Pattern Label", "Fabric Type Label"]] = results_df['model_response'].apply(lambda x: extract_labels(x))

In [25]:
results_df = results_df.dropna()

In [26]:
def convert_to_array(s):
    return [int(x) for x in s.split(',')]
results_df['Fabric Type Label'] = results_df['Fabric Type Label'].apply(convert_to_array)
results_df['Color Pattern Label'] = results_df['Color Pattern Label'].apply(convert_to_array)
results_df['Shape Pattern Label'] = results_df['Shape Pattern Label'].apply(convert_to_array)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  results_df['Fabric Type Label'] = results_df['Fabric Type Label'].apply(convert_to_array)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  results_df['Color Pattern Label'] = results_df['Color Pattern Label'].apply(convert_to_array)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  results_df['Shape Pat

In [27]:
results_df = results_df.reset_index(drop=True)
results_df = results_df.dropna()

In [28]:
predict_shape_expanded = pd.DataFrame(results_df['Shape Pattern Label'].tolist(), columns=[f'predicted_shape_{i}' for i in range(12)])
predict_fabric_expanded = pd.DataFrame(results_df['Color Pattern Label'].tolist(), columns=[f'predicted_color_{i}' for i in range(3)])
predict_pattern_expanded = pd.DataFrame(results_df['Fabric Type Label'].tolist(), columns=[f'predicted_fabric_{i}' for i in range(3)])

In [29]:
results_df = pd.concat([results_df, predict_shape_expanded, predict_fabric_expanded, predict_pattern_expanded], axis=1)

In [31]:
ground_truth_df = ground_truth_df.rename(columns={'filename': 'image_file'})

In [32]:
validation_set = pd.merge(results_df, ground_truth_df, on='image_file', how='inner')

In [33]:
validation_set = validation_set.dropna()

**Writing the file to send to evaluatio - this will contain predicted and actual labels for all the product attributesn**

In [69]:
validation_set.to_excel('gpt_4o_mini_00701_00900.xlsx', index=False)