In [None]:
##########EXTRACTING INFORMATION FROM CATALOGUE TEXT INTO SPREADSHEET#########

# @title
# --- Museum Catalogue Transcription with Gemini + Multi-entry Saving Fix ---
# Works in Google Colab

# --- Step 1: Install necessary libraries ---
!pip install pandas ipywidgets openpyxl requests google-genai --quiet

# --- Step 2: Imports and setup ---
import pandas as pd
from IPython.display import display, HTML, Image as ColabImage
import ipywidgets as widgets
import requests
import os
import re
import time
from google import genai
from google.genai import types
from google.colab import files

# --- Step 3: Set Gemini API key securely ---
os.environ["GOOGLE_API_KEY"] = "INSERTKEYHERE"  # <-- Replace with your actual key

# Initialize Gemini client
try:
    client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"])
    print("Gemini Client Initialized.")
except Exception as e:
    print(f"Error initializing Gemini client: {e}")
    client = None

# ---------------------------------------
# Define input/output paths
# ---------------------------------------
drive_folder = "/content/drive/MyDrive/folder"  # <-- change path
output_excel = os.path.join(drive_folder, "output.xlsx") # <-- change output name

# Load or initialize DataFrame
if os.path.exists(output_excel):
    df = pd.read_excel(output_excel)
    print(f"Loaded existing progress file with {len(df)} rows.")
else:
    image_files = [f for f in os.listdir(drive_folder) if f.lower().endswith(('.jpg', '.jpeg', '.png', '.tif'))]
    df = pd.DataFrame(columns=[
        "Filename", "Catalogue number", "Object name", "Maker/Donor",
        "Date of creation", "Object entry information", "Object/Inventory number",
        "Raw_Response"
    ])
    for file in image_files:
        df.loc[len(df)] = [file, "", "", "", "", "", "", ""]
    print(f"ound {len(image_files)} image(s) in Google Drive folder.")

# Resume progress
analyzed_rows = df["Raw_Response"].notna() & (df["Raw_Response"].astype(str).str.strip() != "")
if analyzed_rows.any():
    last_done = df[analyzed_rows].index.max() + 1
    print(f"Resuming from image #{last_done + 1} of {len(df)}...")
else:
    last_done = 0
    print("ðŸš€ Starting fresh analysis...")

columns = df.columns.tolist()

# ---------------------------------------
# Build widgets
# ---------------------------------------
search_box = widgets.Text(description='Search:', placeholder='Enter keyword(s)...',
                          layout=widgets.Layout(width='60%'), style={'description_width': 'initial'})
column_dropdown = widgets.Dropdown(options=['All columns'] + columns, value='All columns',
                                   description='Search in:', style={'description_width': 'initial'})
search_button = widgets.Button(description=' Search', button_style='success')
clear_button = widgets.Button(description='Clear', button_style='warning')
prev_button = widgets.Button(description='Prev')
next_button = widgets.Button(description='Next')
analyze_button = widgets.Button(description='Analyze Image', button_style='primary')
save_button = widgets.Button(description='Save Now', button_style='info')
output = widgets.Output()

current_index = last_done
filtered_df = df.copy()

# ---------------------------------------
# Helper functions
# ---------------------------------------
def get_current_filename():
    if filtered_df.empty:
        return None
    record = filtered_df.iloc[current_index]
    return record["Filename"]

def display_record(idx):
    with output:
        output.clear_output()
        if filtered_df.empty:
            display(HTML("<h4>No matching results.</h4>"))
            analyze_button.disabled = True
            return
        record = filtered_df.iloc[idx]
        info_html = f"<h3>ðŸ“– Record ({idx + 1} / {len(filtered_df)})</h3><table>"
        for col in columns:
            val = str(record[col])
            if len(val) > 400:
                val = val[:400] + "..."
            info_html += f"<tr><td><b>{col}</b></td><td>{val}</td></tr>"
        info_html += "</table><br>"
        display(HTML(info_html))
        img_path = os.path.join(drive_folder, record["Filename"])
        if os.path.exists(img_path):
            display(ColabImage(filename=img_path, width=400))
        else:
            display(HTML("<p><i>Image file not found.</i></p>"))
            analyze_button.disabled = True

def save_progress():
    df.to_excel(output_excel, index=False)
    print(f"Progress saved to {output_excel}")

def parse_multiple_entries(text):
    """Parse multiple entries per page with full multiline field capture."""

    # Define the exact catalogue field labels
    field_labels = [
        "Continuation from previous page",
        "Catalogue number",
        "Object name",
        "Maker/Donor",
        "Date of creation",
        "Object entry information",
        "Object/Inventory number"
    ]

    # Build a regex-safe alternation of labels
    label_pattern = "|".join([re.escape(lbl) for lbl in field_labels])

    # Split into entries only on Catalogue number OR Continuation
    blocks = re.split(r"(?=Continuation from previous page\s*:|Catalogue number\s*:)",
                      text, flags=re.IGNORECASE)

    entries = []

    for block in blocks:
        block = block.strip()
        if not block:
            continue

        entry = {lbl: "N/A" for lbl in field_labels}
        entry["Continuation"] = "N/A"

        for lbl in field_labels:
            # Regex:
            # <label> : ( capture everything ) until the next LABEL or end of string
            pattern = rf"{re.escape(lbl)}\s*:\s*(.*?)(?=\n(?:{label_pattern})\s*:|$)"
            match = re.search(pattern, block, flags=re.IGNORECASE | re.DOTALL)

            if match:
                val = match.group(1).strip()
                if lbl == "Continuation from previous page":
                    entry["Continuation"] = val
                else:
                    entry[lbl] = val

        entries.append(entry)

    return entries

# ---------------------------------------
# Main analysis
# ---------------------------------------
def perform_visual_analysis(_=None):
    """Perform OCR + structured transcription with multiple entries per page (safe global append)."""
    global df, filtered_df, current_index

    if client is None:
        with output:
            display(HTML("<h4>Gemini client not initialized. Check API key.</h4>"))
        return

    filename = get_current_filename()
    if not filename:
        with output:
            display(HTML("<h4>No image found for this record.</h4>"))
        return

    analyze_button.disabled = True
    analyze_button.description = "Analyzing..."
    with output:
        display(HTML(f"<h4>Analyzing image: <b>{filename}</b></h4>"))

    img_path = os.path.join(drive_folder, filename)
    try:
        with open(img_path, "rb") as f:
            img_bytes = f.read()
        image = types.Part.from_bytes(data=img_bytes, mime_type="image/jpeg")

        prompt = """
        This is a scanned museum catalogue page. Transcribe all object entries found on this page.

        Each page may contain several catalogue entries. For every entry, return:

        Continuation from previous page: [TRUE/FALSE]
        Catalogue number:
        Object name:
        Maker/Donor:
        Date of creation:
        Object entry information:
        Object/Inventory number:

        Detect whether the top text is a continuation of a previous entry
        (no catalogue number or object name at the top = continuation TRUE).

        If a value cannot be identified, use "N/A".
        Keep this exact field order and repeat the block for each entry.
        """

        # --- Retry logic ---
        result_text = ""
        for attempt in range(1, 3):
            try:
                response = client.models.generate_content(
                    model="gemini-2.5-flash-preview-09-2025", ###Change model here if needed
                    contents=[prompt, image]
                )
                if hasattr(response, "text") and response.text:
                    result_text = response.text.strip()
                elif hasattr(response, "candidates") and response.candidates:
                    try:
                        result_text = response.candidates[0].content.parts[0].text.strip()
                    except Exception:
                        result_text = ""
                if result_text:
                    break
                print(f"Empty response on attempt {attempt}, retrying...")
                time.sleep(3)
            except Exception as inner_e:
                print(f"Retry {attempt} failed: {inner_e}")
                time.sleep(3)

        if not result_text:
            raise ValueError("Model returned an empty or invalid response after retries.")

        # Parse all entries
        entries = parse_multiple_entries(result_text)
        if not entries:
            raise ValueError("No entries parsed from Gemini output.")

        # SAFELY remove old rows and append new ones
        df.drop(df[df["Filename"] == filename].index, inplace=True)
        new_df = pd.DataFrame([
            {
                "Filename": filename,
                "Catalogue number": e.get("Catalogue number", "N/A"),
                "Object name": e.get("Object name", "N/A"),
                "Maker/Donor": e.get("Maker/Donor", "N/A"),
                "Date of creation": e.get("Date of creation", "N/A"),
                "Object entry information": e.get("Object entry information", "N/A"),
                "Object/Inventory number": e.get("Object/Inventory number", "N/A"),
                "Raw_Response": result_text
            }
            for e in entries
        ])

        # Append to the global dataframe in place
        df = pd.concat([df, new_df], ignore_index=True)

        # Save progress and refresh filtered view
        save_progress()
        filtered_df = df.copy()

        # --- Display summary ---
        with output:
            display(HTML(f"<h4>Analysis Complete for <b>{filename}</b> â€” {len(entries)} entries found.</h4>"))
            for i, e in enumerate(entries, 1):
                block = "<pre style='background:#f6f8fa; padding:10px; border-radius:6px;'>"
                block += f"<b>Entry {i}</b>\n" + "\n".join([f"{k}: {v}" for k, v in e.items()])
                block += "</pre>"
                display(HTML(block))
            display(HTML("<h5>Full Raw Response:</h5>"))
            display(HTML(f"<pre style='white-space:pre-wrap; background:#eef; padding:10px; border-radius:6px;'>{result_text}</pre>"))


    except Exception as e:
        with output:
            display(HTML(f"<h4>Error while analyzing <b>{filename}</b>: {e}</h4>"))

    finally:
        analyze_button.description = "Analyze Image"
        analyze_button.disabled = False

# ---------------------------------------
# Widget bindings
# ---------------------------------------
def perform_search(_=None):
    global filtered_df, current_index
    query = search_box.value.strip().lower()
    selected = column_dropdown.value
    if not query:
        filtered_df = df.copy()
    else:
        if selected == "All columns":
            mask = df.apply(lambda r: r.astype(str).str.lower().str.contains(query).any(), axis=1)
        else:
            mask = df[selected].astype(str).str.lower().str.contains(query)
        filtered_df = df[mask]
    current_index = 0
    display_record(current_index)

def clear_search(_):
    global filtered_df, current_index
    search_box.value = ""
    column_dropdown.value = "All columns"
    filtered_df = df.copy()
    current_index = 0
    display_record(current_index)

def on_next(_):
    global current_index
    if not filtered_df.empty:
        current_index = (current_index + 1) % len(filtered_df)
        display_record(current_index)

def on_prev(_):
    global current_index
    if not filtered_df.empty:
        current_index = (current_index - 1) % len(filtered_df)
        display_record(current_index)

search_button.on_click(perform_search)
search_box.on_submit(perform_search)
clear_button.on_click(clear_search)
next_button.on_click(on_next)
prev_button.on_click(on_prev)
analyze_button.on_click(perform_visual_analysis)
save_button.on_click(lambda _: save_progress())

# ---------------------------------------
# Display UI
# ---------------------------------------
controls = widgets.HBox([search_box, column_dropdown, search_button, clear_button])
nav = widgets.HBox([prev_button, next_button, analyze_button, save_button])
ui = widgets.VBox([controls, nav, output])
display(ui)

with output:
    output.clear_output()
    display_record(current_index)

#display_record(current_index)

In [None]:
# --- Step 1: Setup and Imports ---
import pandas as pd
import re
import time
from google.colab import drive

# --- Step 2: Mount Google Drive ---
print("Mounting Google Drive...")
drive.mount('/content/drive')
print("Drive mounted.\n")

# --- Step 3: Define file paths ---
excel_path = '/content/drive/MyDrive/input.xlsx' # <-- change path
csv_url = "https://coimages.sciencemuseumgroup.org.uk/datasets/smg_object_records_all_09_04_2025.csv" ### <-- KEEP URL for retrieving object records
output_path = '/content/drive/MyDrive/output.xlsx' # <-- change path

# --- Step 4: Load Excel file ---
print("Loading Excel file...")
start_time = time.time()
df_excel = pd.read_excel(excel_path)
print(f"Loaded Excel file with {len(df_excel)} rows and {len(df_excel.columns)} columns.")
print(f"Columns found: {list(df_excel.columns)}\n")

# --- Step 5: Extract and clean Object/Inventory numbers ---
print("Extracting 'Object/Inventory number' entries matching pattern like 1915â€“155 (anywhere in text)...")

col_name = "Object/Inventory number"
if col_name not in df_excel.columns:
    raise ValueError(f"Column '{col_name}' not found in the Excel file. Please verify the column name.")

df_excel[col_name] = df_excel[col_name].astype(str).str.replace("â€“", "-")

pattern = re.compile(r"(\d{1,6}\s*[-â€“]\s*\d{1,6})")

# Always keep all rows
df_excel["extracted_ids"] = df_excel[col_name].apply(lambda x: pattern.findall(x))

# Normalize IDs for matching
df_excel["clean_id"] = df_excel["extracted_ids"].apply(
    lambda lst: [re.sub(r"\s+", "", i) for i in lst] if lst else []
)

# Build ID universe (only from rows that actually have IDs)
all_ids = [i for sublist in df_excel["clean_id"] for i in sublist]
unique_ids = sorted(set(all_ids))
id_set = set(unique_ids)

print(f"Found {len(unique_ids)} unique valid inventory-like IDs across {len(df_excel)} rows.\n")
print(f"Unique formatted IDs ready for matching: {len(id_set)}\n")

# --- Step 6: Prepare for chunked reading of large CSV ---
print("Starting chunked read of large CSV file...")
chunk_size = 15000
matched_chunks = []
total_rows = 0
match_count = 0
start_csv_time = time.time()

first_chunk = pd.read_csv(csv_url, nrows=5)
cols = list(first_chunk.columns)
print(f"CSV columns: {cols}\n")

# --- Step 7: Read CSV in chunks and search for matches ---
print("Searching for matches in the 'identifier' column...")
for i, chunk in enumerate(
    pd.read_csv(csv_url, chunksize=chunk_size, usecols=['identifier'], low_memory=False)
):
    total_rows += len(chunk)

    chunk["identifier"] = (
        chunk["identifier"]
        .astype(str)
        .str.replace("â€“", "-")
        .str.replace(" ", "")
    )

    matches = chunk[chunk["identifier"].isin(id_set)]

    if not matches.empty:
        print(f"Found {len(matches)} matches in chunk {i+1}.")
        match_count += len(matches)
        matched_chunks.append(matches)

    elapsed = time.time() - start_csv_time
    est_total_chunks = 500000 / chunk_size
    est_remaining = max((elapsed / (i + 1)) * (est_total_chunks - (i + 1)), 0)
    print(
        f"Processed {total_rows:,} rows. "
        f"Estimated time left: ~{est_remaining/60:.1f} minutes\n"
    )

# --- Step 8: Combine matches and re-read full info ---
if matched_chunks:
    all_matches = pd.concat(matched_chunks)
    unique_ids_matched = all_matches["identifier"].unique()
    print(f"Total unique matching IDs found: {len(unique_ids_matched)}")
else:
    print("No matches found.")
    unique_ids_matched = []

# --- Step 9: Load only matching rows fully from CSV ---
if len(unique_ids_matched) > 0:
    print("Re-reading full rows for matched identifiers...")
    matched_data = []
    for chunk in pd.read_csv(csv_url, chunksize=15000, low_memory=False):
        chunk["identifier"] = (
            chunk["identifier"]
            .astype(str)
            .str.replace("â€“", "-")
            .str.replace(" ", "")
        )
        matched_rows = chunk[chunk["identifier"].isin(unique_ids_matched)]
        if not matched_rows.empty:
            matched_data.append(matched_rows)

    df_csv_matched = pd.concat(matched_data)
    print(f"Loaded {len(df_csv_matched)} matched rows from CSV.\n")
else:
    df_csv_matched = pd.DataFrame()

# --- Step 10: Merge (LEFT JOIN, preserving ID-less rows) ---
print("Merging matched data from both sources (preserving all Excel rows)...")

# Explode IDs; rows with no IDs produce NaN clean_id and are preserved
exploded_excel = df_excel.explode("clean_id")

merged = pd.merge(
    exploded_excel,
    df_csv_matched,
    left_on="clean_id",
    right_on="identifier",
    how="left"
)

print(f"Merge complete. Combined rows (including ID-less rows): {len(merged)}\n")

# --- Step 11: Save results ---
print("Saving results to Excel & CSV...")

merged.to_excel(output_path, index=False)

csv_output_path = output_path.replace(".xlsx", ".csv")
merged.to_csv(csv_output_path, index=False)

print(
    f"Done! Files saved:\n"
    f"- Excel: {output_path}\n"
    f"- CSV:   {csv_output_path}"
)

In [None]:
# --- Dataset Comparison: Old Catalogue vs Science Museum Group ---
# Retains verbose feedback, timing, retry logic, and logs for spider chart preparation

!pip install pandas openpyxl requests google-genai tqdm --quiet

import pandas as pd
import os, time, json, random, re
from tqdm import tqdm
from google import genai
from google.genai import types
from google.colab import files
from datetime import datetime

# --- Step 1: Gemini API setup ---
os.environ["GOOGLE_API_KEY"] = "INSERTAPIKEYHERE"  ### INSERT API KEY
client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"])
print("Gemini client initialized.")

# --- Step 2: Load dataset ---
input_path = "/content/drive/MyDrive/input.xlsx"
df = pd.read_excel(input_path)
print(f"Loaded {len(df)} rows from {os.path.basename(input_path)}")

# --- Step 3: Define comparison column sets (generic names: old vs new) ---

old_cols = [
    "Object name",
    "Maker/Donor",
    "Date of creation",
    "Object entry information",
    "Object/Inventory number"
]

new_cols = [
    "title",
    "description",
    "category",
    "object_name",
    "date",
    "place",
    "maker"
]

# --- Step 4: Safe Gemini call with retry and feedback ---

def extract_json_from_text(text):
    """Extract the first valid JSON object from text safely."""
    text = text.strip()
    text = text.replace("```json", "").replace("```", "")
    match = re.search(r"\{[\s\S]*\}", text)
    if match:
        snippet = match.group(0)
        snippet = re.sub(r",\s*([\]}])", r"\1", snippet)
        snippet = snippet.replace("â€¦", "").replace("...", "")
        return snippet
    return None


def call_gemini_with_retry(prompt, max_retries=4, base_delay=5):
    """Call Gemini safely with exponential backoff and improved JSON cleaning."""
    for attempt in range(1, max_retries + 1):
        try:
            response = client.models.generate_content(
                model="gemini-2.5-flash-lite", #### Change Model if needed
                contents=prompt
            )
            raw = response.text.strip()
            print(f"ðŸ§¾ Gemini raw output (first 400 chars): {raw[:400]}...\n")

            cleaned = extract_json_from_text(raw)
            if not cleaned:
                raise ValueError("No valid JSON found in Gemini output")

            try:
                return json.loads(cleaned)
            except json.JSONDecodeError:
                cleaned = cleaned.replace("\n", " ").replace("\r", " ")
                cleaned = cleaned.replace("\\", "\\\\")
                cleaned = re.sub(r"[\x00-\x1f\x7f-\x9f]", "", cleaned)
                return json.loads(cleaned)

        except Exception as e:
            print(f"Gemini parse error (attempt {attempt}): {e}")
        sleep_time = base_delay * attempt + random.uniform(0, 2)
        print(f"Waiting {sleep_time:.1f}s before retry...")
        time.sleep(sleep_time)
    return None

# --- Step 5: Build comparison prompt (generic old vs new) ---

def make_prompt(old_text, new_text):
    return f"""
    You are comparing two museum object catalogue entries referring to the same physical object.
    One comes from an older printed catalogue, and the other from the modern Science Museum Group database.

    --- OLD Catalogue Entry ---
    {old_text}

    --- NEW (Science Museum Group) Entry ---
    {new_text}

    Task:
    1. Identify and count entities and details in each entry, categorizing them as:
       - Object names
       - Makers / Donors
       - Organisations/institutions
       - Dates or date ranges
       - Places
       - Descriptive details (form, use, scientific role, etc.)
       - Other factual attributes

    2. Compute total counts per category for both OLD and NEW datasets.

    3. Identify which entities appear uniquely in one entry but not the other.

    Return ONLY valid JSON in the following structure:
    {{
      "old_counts": {{
          "object_names": int,
          "makers": int,
          "organisations/institutions": int,
          "dates": int,
          "places": int,
          "descriptions": int,
          "other": int,
          "total": int
      }},
      "new_counts": {{
          "object_names": int,
          "makers": int,
          "organisations/institutions": int,
          "dates": int,
          "places": int,
          "descriptions": int,
          "other": int,
          "total": int
      }},
      "unique_to_old": {{
          "object_names": [list],
          "makers": [list],
          "organisations/institutions": [list],
          "dates": [list],
          "places": [list],
          "descriptions": [list],
          "other": [list]
      }},
      "unique_to_new": {{
          "object_names": [list],
          "makers": [list],
          "organisations/institutions": [list],
          "dates": [list],
          "places": [list],
          "descriptions": [list],
          "other": [list]
      }}
    }}
    """

# --- Step 6: Resume support ---

output_path = "/content/drive/MyDrive/output1.xlsx"
log_path = "/content/drive/MyDrive/output2.xlsx"

if os.path.exists(output_path):
    out_df = pd.read_excel(output_path)
    processed = set(out_df["RowIndex"])
    print(f"Resuming from existing results ({len(processed)} processed).")
else:
    out_df = pd.DataFrame()
    processed = set()

logs = []
rows_out = []
start_time_all = time.time()

FAST_MODE = False
MAX_ROWS = 10

# --- Step 7: Main processing loop ---

for idx, row in tqdm(df.iterrows(), total=len(df)):
    if FAST_MODE and idx >= MAX_ROWS:
        print("Fast mode enabled â€” stopping early for testing.")
        break
    if idx in processed:
        continue

    old_text = " ".join(str(row[c]) for c in old_cols if c in df.columns and pd.notna(row[c]))
    new_text = " ".join(str(row[c]) for c in new_cols if c in df.columns and pd.notna(row[c]))

    if not old_text.strip() and not new_text.strip():
        continue

    print(f"\nðŸ”¹ Row {idx + 1}/{len(df)} â€” comparing entry")
    prompt = make_prompt(old_text, new_text)
    print(f"Prompt length: {len(prompt)} characters")

    start_time = time.time()
    result = call_gemini_with_retry(prompt)
    elapsed = round(time.time() - start_time, 1)
    print(f"Completed in {elapsed}s")

    if not result:
        print(f"Failed for row {idx}, skipping.\n")
        logs.append({"RowIndex": idx, "Status": "Failed", "Duration": elapsed})
        continue

    try:
        old, new = result["old_counts"], result["new_counts"]

        row_out = {
            "RowIndex": idx,
            "Old_Total": old.get("total", 0),
            "New_Total": new.get("total", 0),
            "Old_ObjectNames": old.get("object_names", 0),
            "New_ObjectNames": new.get("object_names", 0),
            "Old_Makers": old.get("makers", 0),
            "New_Makers": new.get("makers", 0),
            "Old_Organisations/Institutions": old.get("organisations/institutions", 0),
            "New_Organisations/Institutions": new.get("organisations/institutions", 0),
            "Old_Dates": old.get("dates", 0),
            "New_Dates": new.get("dates", 0),
            "Old_Places": old.get("places", 0),
            "New_Places": new.get("places", 0),
            "Old_Descriptions": old.get("descriptions", 0),
            "New_Descriptions": new.get("descriptions", 0),
            "Old_Other": old.get("other", 0),
            "New_Other": new.get("other", 0),
            "Unique_to_Old": json.dumps(result.get("unique_to_old", {})),
            "Unique_to_New": json.dumps(result.get("unique_to_new", {}))
        }

        rows_out.append(row_out)
        out_df = pd.concat([out_df, pd.DataFrame([row_out])], ignore_index=True)
        out_df.to_excel(output_path, index=False)

        # Save CSV version
        csv_output_path = output_path.replace(".xlsx", ".csv")
        out_df.to_csv(csv_output_path, index=False)

        logs.append({
            "RowIndex": idx,
            "Status": "Success",
            "Duration": elapsed,
            "Timestamp": datetime.now().isoformat()
        })
        pd.DataFrame(logs).to_excel(log_path, index=False)

        print(f"Saved row {idx} â€” OLD={old.get('total', 0)}, NEW={new.get('total', 0)}\n")
        time.sleep(random.uniform(1.0, 2.0))

    except Exception as e:
        print(f"Failed to parse/save row {idx}: {e}")
        logs.append({"RowIndex": idx, "Status": f"Error: {e}", "Duration": elapsed})

total_time = round(time.time() - start_time_all, 1)
print(f"\nCompleted comparison for {len(rows_out)} rows.")
print(f"Results: {output_path}")
print(f"Logs: {log_path}")
print(f"Total time: {total_time / 60:.2f} min")

files.download(output_path)


In [None]:
########################IMPROVED DOCUMENTATION PROMPT WITH LLM LANGUAGE CHECKS#################
############DOCUMENTATION CHECKS############
# --- Historic Catalogue Schema Extraction Script ---
# Retains XLSX + CSV output, logs, retry logic, etc.
# NOW uses LLM to detect harmful and outdated language.

!pip install pandas openpyxl requests google-genai tqdm --quiet

import pandas as pd
import os, time, json, random, re
from tqdm import tqdm
from google import genai
from google.genai import types
from google.colab import files
from datetime import datetime
import re


# ---------------------------------------------
# Clean illegal Excel characters
# ---------------------------------------------
def clean_excel_string(s):
    if isinstance(s, str):
        return re.sub(r"[\x00-\x08\x0B\x0C\x0E-\x1F]", "", s)
    return s


# ---------------------------------------------
# Step 1: Gemini API setup
# ---------------------------------------------
os.environ["GOOGLE_API_KEY"] = "INSERTAPIKEYHERE"  ### Change to your API KEY
client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"])
print("Gemini client initialized.")


# ---------------------------------------------
# Step 2: Load dataset
# ---------------------------------------------
input_path = "/content/drive/MyDrive/input.xlsx"
df = pd.read_excel(input_path)
print(f"Loaded {len(df)} rows from {os.path.basename(input_path)}")


# ---------------------------------------------
# Step 3: Historic text columns
# ---------------------------------------------
historic_cols = [
    "Object name",
    "Maker/Donor",
    "Date of creation",
    "Object entry information",
    "Object/Inventory number"
]


# ---------------------------------------------
# Step 4: Safe Gemini call with retry
# ---------------------------------------------

def extract_json_from_text(text):
    text = text.strip()
    text = text.replace("```json", "").replace("```", "")
    match = re.search(r"\{[\s\S]*\}", text)
    if match:
        snippet = match.group(0)
        snippet = re.sub(r",\s*([\]}])", r"\1", snippet)
        snippet = snippet.replace("â€¦", "").replace("...", "")
        return snippet
    return None


def call_gemini_with_retry(prompt, max_retries=4, base_delay=5):
    for attempt in range(1, max_retries + 1):
        try:
            response = client.models.generate_content(
                model="gemini-2.5-flash-lite",
                contents=prompt
            )
            raw = response.text.strip()
            print(f"ðŸ§¾ Gemini raw output (first 400 chars): {raw[:400]}...\n")

            cleaned = extract_json_from_text(raw)
            if not cleaned:
                raise ValueError("No valid JSON found in Gemini output")

            try:
                return json.loads(cleaned)
            except json.JSONDecodeError:
                cleaned = cleaned.replace("\n", " ").replace("\r", " ")
                cleaned = cleaned.replace("\\", "\\\\")
                cleaned = re.sub(r"[\x00-\x1f\x7f-\x9f]", "", cleaned)
                return json.loads(cleaned)

        except Exception as e:
            print(f"Gemini parse error (attempt {attempt}): {e}")

        sleep_time = base_delay * attempt + random.uniform(0, 2)
        print(f" Waiting {sleep_time:.1f}s before retry...")
        time.sleep(sleep_time)

    return None


# ---------------------------------------------
# Step 5: Prompt including harmful/outdated detection
# ---------------------------------------------

def make_prompt(entry_text):
    return f"""
You are analysing historic museum catalogue entries whose terminology and structure
predate modern museum documentation standards. Your task is to perform THREE functions:

1) Detect whether each modern museum information type is present within the historic entry.
2) Extract the entities or values associated with that information type.
3) Detect any harmful or outdated language present in the entry (details below).

Historic catalogues often contain information in implicit or differently-structured ways.
Interpret the historic phrasing and map it onto the modern extraction schema below.

--------------------------------------------------------------------
TRANSLATED EXTRACTION SCHEMA (TARGET FORMAT)
--------------------------------------------------------------------

ESSENTIAL
- object_number: catalogue number of the object (primary identifier appearing before name)
- secondary_identifier: object ID or other identifying number appearing toward the end of the description
- whole_or_part: determine whether the entry describes the whole object ("Whole") or a part of an object ("Part")
- item_count: the number of physically separate items mentioned; if unknown â†’ "N/A"
- collection: name of the catalogue only
- legal_status: copyright, owner, donor, purchase or other legal statement; if none stated â†’ null
- object_name: name/title of the object or catalogue entry
- description: the descriptive text of the catalogue entry
- location: where the item is stored or displayed; if not mentioned â†’ null

CORE
- hazards: list hazardous materials or warnings referenced; separate with ";"
- makers_and_associated_people:
    list of {{ "name": <string>, "role": <string> }}
- place_made: places explicitly stated as where the object was made or used; output as list or null
- date_made: when the object was made

ENHANCED
- materials: extract all materials explicitly stated as making up the object or being a product of the object; separated by ";"
- measurements: all dimensional information; separated by ";"
- interpretation: any contextual, historical, significance-related or interpretive statements
- interpretation_source: references, publications, authors, or sources for interpretation
- interpretation_date: use catalogue publication date unless another specific interpretation date is given

--------------------------------------------------------------------
INTERPRETATION RULES
--------------------------------------------------------------------
- Object number and secondary identifier must reflect your rules:
    â€¢ object_number â†’ catalogue number BEFORE the name
    â€¢ secondary_identifier â†’ object ID appearing later or at end of text
- whole_or_part must be exactly "Whole" or "Part".
- Item count must be numeric or "N/A".
- Legal status:
    â€¢ donor mentioned â†’ "donated"
    â€¢ purchased â†’ "purchased"
    â€¢ loaned â†’ "loaned"
    â€¢ bequeathed â†’ "bequeathed"
    â€¢ absent â†’ null
- Maker roles must be explicit and accurate (maker, donor, inventor, user, etc.)
- Place-related information must include the role (place made, place used, etc.)
- Materials and hazards must only include items that the object is made out of or produces.
- Measurements must be extracted verbatim.
- Interpretation must include only interpretive content (purpose, significance, context).
- Interpretation source includes only explicit references.
- Any field not present â†’ return null.

--------------------------------------------------------------------
ADDITIONAL TASKS: LANGUAGE SENSITIVITY CHECKS
--------------------------------------------------------------------
Analyse the entry and identify:

1. **HARMFUL LANGUAGE**
   Includes (but is not limited to):
   - derogatory descriptors
   - racist/colonial terminology
   - sexist terms
   - ableist terms
   - dehumanising phrasing
   - offensive ethnonyms
   - pejorative references to groups or individuals

2. **OUTDATED / OBSOLETE LANGUAGE**
   Includes but limited to:
   - archaic ethnonyms
   - obsolete medical or psychiatric terms
   - outdated occupational labels
   - antiquated or colonial-era descriptors
   - archaic spellings or terminology no longer used today

Return:
- count of terms for each category
- list of the exact words/phrases found

--------------------------------------------------------------------
OUTPUT FORMAT
--------------------------------------------------------------------
Return a single JSON object with the extraction schema above, AND two additional objects:

"harmful_language": {{
    "count": int,
    "terms": [list]
}},
"outdated_language": {{
    "count": int,
    "terms": [list]
}}

JSON only. No commentary.

--------------------------------------------------------------------
INPUT
--------------------------------------------------------------------
{entry_text}

--------------------------------------------------------------------
OUTPUT
--------------------------------------------------------------------
JSON only.
"""


# ---------------------------------------------
# Step 6: Resume support
# ---------------------------------------------
output_path = "/content/drive/MyDrive/output.xlsx"
output_csv = "/content/drive/MyDrive/output.csv"
log_path = "/content/drive/MyDrive/output2.xlsx"

if os.path.exists(output_path):
    out_df = pd.read_excel(output_path)
    processed = set(out_df["RowIndex"])
    print(f"Resuming from existing results ({len(processed)} processed).")
else:
    out_df = pd.DataFrame()
    processed = set()

logs = []
rows_out = []
start_time_all = time.time()

FAST_MODE = False
MAX_ROWS = 10


# ---------------------------------------------
# Step 7: Main Loop
# ---------------------------------------------
for idx, row in tqdm(df.iterrows(), total=len(df)):

    if FAST_MODE and idx >= MAX_ROWS:
        break
    if idx in processed:
        continue

    entry_text = " ".join(
        str(row[c]) for c in historic_cols if c in df.columns and pd.notna(row[c])
    )
    if not entry_text.strip():
        continue

    print(f"\nðŸ”¹ Row {idx+1}/{len(df)} â€” extracting structured schema + language checks")

    prompt = make_prompt(entry_text)
    result = call_gemini_with_retry(prompt)
    if not result:
        logs.append({"RowIndex": idx, "Status": "Failed"})
        continue

    # Store output
    row_out = {"RowIndex": idx}
    row_out.update(result)

    rows_out.append(row_out)
    out_df = pd.concat([out_df, pd.DataFrame([row_out])], ignore_index=True)

    # Clean Excel characters
    out_df = out_df.applymap(clean_excel_string)

    # Save XLSX + CSV
    out_df.to_excel(output_path, index=False)
    out_df.to_csv(output_csv, index=False)

    logs.append({"RowIndex": idx, "Status": "Success"})
    pd.DataFrame(logs).to_excel(log_path, index=False)

    print(
        f"Saved row {idx} â€” harmful={result['harmful_language']['count']} "
        f"outdated={result['outdated_language']['count']}"
    )

    time.sleep(random.uniform(1.0, 2.0))


# ---------------------------------------------
# Final Summary
# ---------------------------------------------
print("Completed extraction.")
print(output_path)
print(output_csv)
print(log_path)

files.download(output_path)
files.download(output_csv)
