In [1]:
import gzip
import shutil
import os
import json
import csv

## Data extraction

In [None]:
source_dir = 'abo-listings/listings/metadata/'
destination_dir = 'abo-listings/listings/extracted_metadata/'

os.makedirs(destination_dir, exist_ok=True)

compressed_files = [file for file in os.listdir(source_dir) if file.endswith('.json.gz')]

for compressed_file in compressed_files:
    src_path = os.path.join(source_dir, compressed_file)
    dest_filename = compressed_file.replace('.gz', '')  # strips only .gz
    dest_path = os.path.join(destination_dir, dest_filename)

    with gzip.open(src_path, 'rb') as source_file, open(dest_path, 'wb') as target_file:
        shutil.copyfileobj(source_file, target_file)

    print(f"Decompressed: {compressed_file} → {dest_filename}")

print("Extraction process finished.")

## Filtering data

In [None]:
# Define directories
source_dir = 'abo-listings/listings/extracted_metadata'
target_dir = 'abo-listings/listings/filtered_metadata'

# Ensure output directory exists
os.makedirs(target_dir, exist_ok=True)

# Define output CSV header
csv_columns = [
    'main_image_id', 'overall_description', 'colour_description',
    'other_description', 'material_description'
]

# Utility: Extract 'value' fields filtered by language
def extract_values_by_language(entries):
    return [
        entry['value'] for entry in entries
        if 'value' in entry and (
            'language_tag' not in entry or entry['language_tag'] in {'en_US', 'en_IN'}
        )
    ]

# Utility: Extract 'standardized_values' from color fields filtered by language
def extract_standardized_colors(entries):
    standardized = []
    for entry in entries:
        if 'language_tag' not in entry or entry['language_tag'] in {'en_US', 'en_IN'}:
            standardized.extend(entry.get('standardized_values', []))
    return standardized

# Loop through each JSON file
for file_name in os.listdir(source_dir):
    if not file_name.endswith('.json'):
        continue

    json_path = os.path.join(source_dir, file_name)
    csv_path = os.path.join(target_dir, file_name.replace('.json', '.csv'))

    print(f"Processing file: {json_path} → {csv_path}")

    # Load JSON lines
    records = []
    with open(json_path, 'r', encoding='utf-8') as file:
        for line in file:
            line = line.strip()
            if line:
                try:
                    records.append(json.loads(line))
                except json.JSONDecodeError as error:
                    print(f"Error parsing line in {file_name}: {error}")

    # Filter out incomplete or irrelevant records
    required_fields = {
        'brand', 'bullet_point', 'color', 'model_name',
        'item_name', 'product_type', 'main_image_id',
        'item_keywords', 'country'
    }

    valid_records = [
        rec for rec in records
        if required_fields.issubset(rec.keys()) and rec.get('country') in {'IN', 'US'}
    ]

    print(f" → Valid entries found: {len(valid_records)}")

    # Write filtered records to CSV
    with open(csv_path, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(csv_columns)

        for rec in valid_records:
            overall_desc = extract_values_by_language(rec.get('bullet_point', []))

            color_desc = []
            color_desc += extract_standardized_colors(rec.get('color', []))
            color_desc += extract_values_by_language(rec.get('color', []))

            other_desc = []
            for key in ['product_type', 'item_keywords']:
                other_desc += extract_values_by_language(rec.get(key, []))

            material_desc = extract_values_by_language(rec.get('material', [])) if 'material' in rec else []

            row = [
                rec.get('main_image_id', ''),
                '; '.join(overall_desc),
                '; '.join(color_desc),
                '; '.join(other_desc),
                '; '.join(material_desc)
            ]
            writer.writerow(row)

    print(f" → Output written: {csv_path} ({len(valid_records)} rows)\n")


In [None]:
!pip install --upgrade pip
!pip install google-genai

In [None]:
from google import genai
from google.genai import Client, types
import time

print(dir(genai))
print(dir(Client))
print(dir(types))

In [None]:
import os
import csv
import json
import time
from google.generativeai import Client, types

# ==== Configuration ====
API_KEY = ""  # Insert your Gemini API key here
DAILY_LIMIT = 1500
REQUEST_INTERVAL = 60  # seconds

# ==== Client Setup ====
gemini = Client(api_key=API_KEY)

# ==== Progress Tracker ====
def read_progress(file_path):
    if os.path.isfile(file_path):
        with open(file_path, 'r') as f:
            return int(f.read().strip())
    return 0

def write_progress(file_path, index):
    with open(file_path, 'w') as f:
        f.write(str(index))

# ==== API Interaction ====
def generate_visual_questions(image_bytes, description_text):
    prompt = (
        "You will receive an image along with a short product description.\n"
        f"Refer to this product description for context: {description_text}\n"
        "Create exactly 5 visually-based questions that increase in difficulty and are varied in nature.\n"
        "Each question must be answerable *solely* through visual inspection of the image — do not use external knowledge or assumptions.\n"
        "Incorporate a mix of visual features across questions, such as: color, number of elements, shapes, positioning, relative size, and any visible text.\n"
        "Ensure a balance in difficulty:\n"
        "- 2 questions should be easy (e.g., identify a color or count elements)\n"
        "- 2 should be of medium complexity (e.g., spatial arrangement, size comparisons)\n"
        "- 1 should be more difficult, requiring close observation or visual reasoning (e.g., identifying a main feature or deducing purpose from form)\n"
        "Avoid asking about non-visible attributes like materials or internal functions.\n"
        "Each answer must be a *single word* and answers should not all be 'yes' or 'no'.\n"
        "Format your output exactly like this — do not include any extra comments or explanations:\n"
        "Question 1: <your question>\n"
        "Answer 1: <your one-word answer>"
    )

    try:
        response = gemini.models.generate_content(
            model='gemini-2.0-flash',
            contents=[
                types.Part.from_bytes(data=image_bytes, mime_type='image/jpeg'),
                prompt
            ]
        )
        return response.text
    except Exception as e:
        print(f"API error: {e}")
        return None

# ==== Main Processing Logic ====
def generate_questions_for_dataset(metadata_csv, images_csv, image_root, output_csv, progress_txt):
    image_map = {}
    request_count = 0

    # Load image paths
    with open(images_csv, 'r', encoding='utf-8') as img_file:
        reader = csv.DictReader(img_file)
        for row in reader:
            image_map[row['image_id']] = row['path']
    print("Image metadata loaded.")

    os.makedirs(os.path.dirname(output_csv), exist_ok=True)
    resume_from = read_progress(progress_txt)
    current_line = 0

    with open(output_csv, 'a', newline='', encoding='utf-8') as out_file:
        writer = csv.writer(out_file)
        if os.stat(output_csv).st_size == 0:
            writer.writerow(['image_id', 'image_path', 'question', 'answer'])

        with open(metadata_csv, 'r', encoding='utf-8') as data_file:
            reader = csv.DictReader(data_file)
            for entry in reader:
                if current_line < resume_from:
                    current_line += 1
                    continue

                if request_count >= DAILY_LIMIT:
                    print("Daily request cap reached.")
                    break

                img_id = entry['main_image_id']
                img_rel_path = image_map.get(img_id)
                if not img_rel_path:
                    print(f"No image path found for: {img_id}")
                    current_line += 1
                    continue

                full_path = os.path.join(image_root, img_rel_path)
                if not os.path.exists(full_path):
                    print(f"Missing image file: {full_path}")
                    current_line += 1
                    continue

                try:
                    with open(full_path, 'rb') as img:
                        img_data = img.read()
                except Exception as e:
                    print(f"Failed to read image {full_path}: {e}")
                    current_line += 1
                    continue

                description = (
                    f"Overall: {entry['overall_description']}; "
                    f"Color: {entry['colour_description']}; "
                    f"Material: {entry['material_description']}; "
                    f"Other: {entry['other_description']}"
                )

                print(f"Requesting for {img_id}...")
                response_text = generate_visual_questions(img_data, description)

                if response_text:
                    lines = [line.strip() for line in response_text.split('\n') if line.strip()]
                    questions = [l for l in lines if l.lower().startswith('question')]
                    answers = [l for l in lines if l.lower().startswith('answer')]

                    if len(questions) == 5 and len(answers) == 5:
                        for q, a in zip(questions, answers):
                            question = q.split(':', 1)[1].strip()
                            answer = a.split(':', 1)[1].strip()
                            writer.writerow([img_id, full_path, question, answer])
                            out_file.flush()
                        print(f"Finished: {img_id}")
                    else:
                        print(f"Incomplete or misformatted response for: {img_id}")
                else:
                    print(f"Generation failed for: {img_id}")

                request_count += 1
                current_line += 1
                write_progress(progress_txt, current_line)

                if request_count < DAILY_LIMIT:
                    print(f"Waiting {REQUEST_INTERVAL} seconds...")
                    time.sleep(REQUEST_INTERVAL)

# ==== Execution Parameters ====
filename_tag = 'listings_3'
question_batch = 'set_4'

csv_metadata = f'abo-listings/listings/filtered_metadata/{filename_tag}.csv'
csv_images = 'abo-images-small/images/metadata/images.csv'
image_directory = 'abo-images-small/images/small'

output_dir = 'generated_questions'
progress_dir = 'progress'
os.makedirs(output_dir, exist_ok=True)
os.makedirs(progress_dir, exist_ok=True)

csv_output = os.path.join(output_dir, f'questions_{filename_tag}_{question_batch}.csv')
progress_marker = os.path.join(progress_dir, f'progress_{filename_tag}.txt')

# ==== Start Processing ====
generate_questions_for_dataset(csv_metadata, csv_images, image_directory, csv_output, progress_marker)
