# Setup

In [1]:
%load_ext autoreload
%autoreload 2

import os
os.chdir("/nfs/turbo/coe-chaijy/sstorks/simulation_informed_pcr4nlu/TRAVEl")
from travel import init_travel
init_travel()

import argparse
from collections import defaultdict, Counter
from copy import deepcopy
from itertools import product
import json
import numpy as np
import os
import pickle
from PIL import Image
from pprint import pprint
import shutil
import spacy
import time
import torch
from tqdm import tqdm
from transformers import AutoModelForVision2Seq, AutoModelForCausalLM, AutoProcessor, BitsAndBytesConfig, AutoModelForSequenceClassification, AutoTokenizer, PhrasalConstraint           

from travel.constants import RESULTS_DIR, IMAGES_CHUNK_SIZE, HF_TOKEN, CONFIG_PATH
from travel.data.captaincook4d import CaptainCook4DDataset
from travel.data.ego4d import Ego4DMistakeDetectionDataset
from travel.data.mistake_detection import MistakeDetectionTasks
from travel.data.vqa import VQAResponse, get_vqa_response_token_ids, VQAOutputs, DIALOG_START_TOKENS, IMAGE_TOKENS, USER_START_TOKENS, USER_END_TOKENS, ASSISTANT_START_TOKENS, ASSISTANT_END_TOKENS, IVQA_PREAMBLE, IVQA_SUCCESS_QUESTION
from travel.data.vqg import generate_vqg_prompt_icl
from travel.model import simple_lm_prompt_beam_search, simple_vlm_prompt_beam_search, compute_completion_log_likelihoods, compute_completion_log_likelihoods_encoder_decoder, compute_completion_log_likelihoods_vlm
from travel.model.metrics import question_coherence_metrics_nli, question_coherence_metrics_vlm, generate_det_curve, compile_accuracy_and_coherence_metrics, generate_3d_overview_graph
from travel.model.mistake_detection import MISTAKE_DETECTION_THRESHOLDS
from travel.model.nli import NLI_MODEL_PATH, NLI_BATCH_SIZE
from travel.model.vqa import run_vqa_with_visual_filter
from travel.model.vqg import cleanup_generated_question



Qualtrics API helpers:

In [2]:
import requests
import json
import base64
import io
from PIL import Image
import uuid
from typing import List, Optional
from pprint import pprint

class QualtricsAnnotationSurvey:
    def __init__(self, api_token: str, data_center: str, library_name: str):
        """
        Initialize Qualtrics API client
        
        Args:
            api_token: Your Qualtrics API token
            data_center: Your data center (e.g., 'ca1', 'fra1', 'sydney1')
        """
        self.api_token = api_token
        self.data_center = data_center
        self.library_name = library_name
        self.base_url = f"https://{data_center}.qualtrics.com/API/v3"
        self.headers = {
            "X-API-TOKEN": api_token,
            "Content-Type": "application/json"
        }
    
    def get_library_id(self) -> Optional[str]:
        """
        Get the main graphics library ID for the account
        
        Returns:
            Library ID or None if failed
        """
        url = f"{self.base_url}/libraries"
        
        try:
            response = requests.get(url, headers=self.headers)
            response.raise_for_status()
            
            result = response.json()
            libraries = result.get('result', {}).get('elements', [])
            
            # Find the graphics library
            for library in libraries:
                pprint(library)
                if library.get('libraryName') == self.library_name:
                    return library.get('libraryId')
            
            # If no graphics library found, return None
            print("No graphics library found")
            return None
            
        except requests.exceptions.RequestException as e:
            print(f"Failed to get libraries: {e}")
            return None
    
    def upload_image_to_library(self, pil_image: Image.Image, filename: str) -> Optional[str]:
        """
        Upload PIL Image to Qualtrics Graphics Library
        
        Args:
            pil_image: PIL Image object
            filename: Name for the uploaded file
            
        Returns:
            String URL of uploaded image or None if failed
        """
        # Get the library ID first
        library_id = self.get_library_id()
        if not library_id:
            print("Could not find graphics library")
            return None
        
        # Convert PIL image to bytes
        img_bytes = io.BytesIO()
        pil_image.save(img_bytes, format='PNG')
        img_bytes.seek(0)
        
        # Prepare multipart form data
        files = {
            'file': (filename, img_bytes.getvalue(), 'image/png')
        }
        
        headers_upload = {
            "X-API-TOKEN": self.api_token
        }
        
        url = f"{self.base_url}/libraries/{library_id}/graphics"
        
        try:
            response = requests.post(url, headers=headers_upload, files=files)
            response.raise_for_status()
            
            result = response.json()
            # The response should contain the graphic ID and URL
            graphic_id = result.get('result', {}).get('id')
            if graphic_id:
                # Construct the URL for the uploaded image
                return f"https://{self.data_center}.qualtrics.com/WRQualtricsShared/Graphics/{graphic_id}"
            else:
                print("No graphic ID returned")
                return None
                
        except requests.exceptions.RequestException as e:
            print(f"Failed to upload image {filename}: {e}")
            if hasattr(e, 'response') and e.response is not None:
                print(f"Response content: {e.response.text}")
            return None
        
    def add_survey_instructions(self, survey_id: str) -> bool:
        """
        Add an instruction block as the first question in the survey
        
        Args:
            survey_id: ID of the survey
            
        Returns:
            True if successful, False otherwise
        """
#         instructions_html = """<p>You will be shown pairs of procedural texts and egocentric (POV) photos from people's perspective after trying to perform the procedure. Based on the photo, you will be asked to determine whether the person has made a mistake in completing the given procedure, or successfully completed it.</p>
# <p><strong>Guidelines:</strong></p>
# <ul>
# <li>If you see the object that should be affected by the procedure and it appears to have undergone the action(s) in the procedure (e.g., moving to a specific location, slicing, opening), the image should be labeled a success. If the object's state contradicts the success state for the procedure, the image should be labeled a mistake.</li>
# <li>If you don't see the object or only see part of it, it could either be a success or mistake. Make your best guess based on what you do see in the image.</li>
# <li>If you're ever not sure (e.g., the image doesn't have enough information or it's blurry/low quality), also make your best guess based on any context clues you see.</li>
# </ul>
# <p>Click <strong>Next</strong> to begin.</p>"""
        
        question_data = {
            "QuestionText": "Click Next to begin the annotation task.",
            "QuestionType": "Text", 
            "Selector": "ML",
            "DataExportTag": "instructions"
        }

        url = f"{self.base_url}/survey-definitions/{survey_id}/questions"
        
        try:
            response = requests.post(url, headers=self.headers, json=question_data)
            response.raise_for_status()
            return True
        except requests.exceptions.RequestException as e:
            print(f"Failed to create instructions: {e}")
            if hasattr(e, 'response') and e.response is not None:
                print(f"Response status: {e.response.status_code}")
                try:
                    error_detail = e.response.json()
                    print(f"Error details: {json.dumps(error_detail, indent=2)}")
                except:
                    print(f"Response content: {e.response.text[:500]}...")
            return False
        
    def create_survey(self, survey_name: str) -> Optional[str]:
        """
        Create a new survey
        
        Args:
            survey_name: Name for the survey
            
        Returns:
            Survey ID or None if failed
        """
        data = {
            "SurveyName": survey_name,
            "Language": "EN",
            "ProjectCategory": "CORE"
        }
        
        url = f"{self.base_url}/survey-definitions"
        
        try:
            response = requests.post(url, headers=self.headers, json=data)
            response.raise_for_status()
            
            result = response.json()
            return result['result']['SurveyID']
        except requests.exceptions.RequestException as e:
            print(f"Failed to create survey: {e}")
            if hasattr(e, 'response') and e.response is not None:
                print(f"Response content: {e.response.text}")
            return None
    
    def create_mistake_detection_question(self, survey_id: str, image_url: str, 
                                        task_description: str, question_id: str) -> bool:
        """
        Create a mistake detection question with embedded image and binary choice
        
        Args:
            survey_id: ID of the survey
            image_data_url: Base64 data URL of the image
            task_description: Text describing the task
            question_id: Unique identifier for this question
            
        Returns:
            True if successful, False otherwise
        """
        # HTML content with embedded image and task description
        # question_html = f"""<p>You will be shown pairs of procedural texts and egocentric (POV) photos from people's perspective after trying to perform the procedure. Based on the photo, you will be asked to determine whether the person has made a mistake in completing the given procedure, or successfully completed it.</p>

        # <p><strong>Guidelines:</strong></p>
        # <ul>
        # <li>If you see the object that should be affected by the procedure and it appears to have undergone the action(s) in the procedure (e.g., moving to a specific location, slicing, opening), the image should be labeled a success. If the object's state contradicts the success state for the procedure, the image should be labeled a mistake.</li>
        # <li>If you don't see the object or only see part of it, it could either be a success or mistake. Make your best guess based on what you do see in the image.</li>
        # <li>If you're ever not sure (e.g., the image doesn't have enough information or it's blurry/low quality), also make your best guess based on any context clues you see.</li>
        # </ul>

        # <p><strong>Target procedure:</strong> {task_description}</p>

        # <p><img src="{image_data_url}" style="max-width: 100%; height: auto;" alt="Task completion image"/></p>

        # <p><strong>Question: Did the person make a mistake in completing the described task?</strong></p>"""

        # question_html = f"""<p><strong>Target procedure:</strong> {task_description}</p>
        # <p><img src="{image_data_url}" alt="Task image"/></p>
        # <p><strong>Did the person make a mistake completing the described task?</strong></p>"""

        if image_url:
            question_html = f"""<p>Target procedure: {task_description}</p>
            <p><img src="{image_url}" alt="Task image"/></p>
            <p>Did the person make a mistake completing this task?</p>"""
        else:
            print(f"Failed to upload image for question {question_id}")
            question_html = f"<p>Target procedure: {task_description}</p><p>Did the person make a mistake completing this task?</p>"

        question_data = {
            "QuestionText": question_html,
            "DataExportTag": f"mistake_detection_{question_id}",
            "QuestionType": "MC",
            "Selector": "SAVR",  # Single Answer Vertical
            "Configuration": {
                "QuestionDescriptionOption": "UseText"
            },
            "QuestionDescription": f"Mistake Detection - {question_id}",
            "Choices": {
                "1": {
                    "Display": "Yes - There is a mistake"
                },
                "2": {
                    "Display": "No - Task completed successfully"
                }
            },
            "ChoiceOrder": ["1", "2"],
            "Validation": {
                "Settings": {
                    "ForceResponse": "ON",
                    "Type": "None"
                }
            }
        }

        url = f"{self.base_url}/survey-definitions/{survey_id}/questions"
        
        try:
            response = requests.post(url, headers=self.headers, json=question_data)
            response.raise_for_status()
            return True
        except requests.exceptions.RequestException as e:
            print(f"Failed to create question {question_id}: {e}")
            if hasattr(e, 'response') and e.response is not None:
                print(f"Response status: {e.response.status_code}")
                try:
                    error_detail = e.response.json()
                    print(f"Error details: {json.dumps(error_detail, indent=2)}")
                except:
                    print(f"Response content: {e.response.text[:500]}...")
            
            # Debug: Print question data size and structure
            print(f"Question HTML length: {len(question_html)} characters")
            print(f"Image data URL length: {len(image_url)} characters")
            return False
    
    def create_confidence_question(self, survey_id: str, question_id: str) -> bool:
        """
        Create a confidence rating question (1-5 scale)
        
        Args:
            survey_id: ID of the survey
            question_id: Unique identifier for this question
            
        Returns:
            True if successful, False otherwise
        """
        question_data = {
            "QuestionText": "How confident are you in your answer?",
            "DataExportTag": f"confidence_{question_id}",
            "QuestionType": "MC",
            "Selector": "SAVR",
            "Configuration": {
                "QuestionDescriptionOption": "UseText"
            },
            "QuestionDescription": f"Confidence Rating - {question_id}",
            "Choices": {
                "1": {"Display": "1 - Not confident at all"},
                "2": {"Display": "2 - Slightly confident"},
                "3": {"Display": "3 - Moderately confident"},
                "4": {"Display": "4 - Very confident"},
                "5": {"Display": "5 - Extremely confident"}
            },
            "ChoiceOrder": ["1", "2", "3", "4", "5"],
            "Validation": {
                "Settings": {
                    "ForceResponse": "ON",
                    "Type": "None"
                }
            }
        }
        
        url = f"{self.base_url}/survey-definitions/{survey_id}/questions"
        
        try:
            response = requests.post(url, headers=self.headers, json=question_data)
            response.raise_for_status()
            return True
        except requests.exceptions.RequestException as e:
            print(f"Failed to create confidence question {question_id}: {e}")
            if hasattr(e, 'response') and e.response is not None:
                print(f"Response content: {e.response.text}")
            return False
    
    def publish_survey(self, survey_id: str, description: str = "") -> Optional[str]:
        """
        Publish the survey to make it active and accessible
        
        Args:
            survey_id: ID of the survey to publish
            description: Optional description for the publication
            
        Returns:
            Published survey URL or None if failed
        """
        # First, we need to get the current survey state
        url = f"{self.base_url}/survey-definitions/{survey_id}"
        
        try:
            # Get current survey definition
            response = requests.get(url, headers=self.headers)
            response.raise_for_status()
            
            # Update the survey to published state
            publish_data = {
                "isActive": True
            }
            
            # Use PATCH to update survey state
            response = requests.patch(url, headers=self.headers, json=publish_data)
            response.raise_for_status()
            
            # Generate the public survey URL
            survey_url = f"https://{self.data_center}.qualtrics.com/jfe/form/{survey_id}"
            
            print(f"Survey published successfully!")
            print(f"Survey URL: {survey_url}")
            
            return survey_url
            
        except requests.exceptions.RequestException as e:
            print(f"Failed to publish survey: {e}")
            if hasattr(e, 'response') and e.response is not None:
                print(f"Response content: {e.response.text}")
            return None

def generate_annotation_survey(api_token: str, data_center: str, library_name: str,
                             images: List[Image.Image], 
                             task_descriptions: List[str],
                             survey_name: str = "Mistake Detection Annotation",
                             publish: bool = True) -> Optional[str]:
    """
    Generate a complete annotation survey with images and task descriptions
    
    Args:
        api_token: Qualtrics API token
        data_center: Qualtrics data center
        images: List of PIL Image objects
        task_descriptions: List of task description strings (same length as images)
        survey_name: Name for the created survey
        publish: Whether to automatically publish the survey
        
    Returns:
        Survey URL if successful and published, Survey ID if not published, None if failed
    """
    if len(images) != len(task_descriptions):
        raise ValueError("Number of images must match number of task descriptions")
    
    # Initialize API client
    client = QualtricsAnnotationSurvey(api_token, data_center, library_name)
    
    # Create survey
    survey_id = client.create_survey(survey_name)
    if not survey_id:
        print("Failed to create survey")
        return None
    
    print(f"Created survey with ID: {survey_id}")
    
    # Add instructions as the first question
    # if client.add_survey_instructions(survey_id):
    #     print("✓ Added survey instructions")
    # else:
    #     print("✗ Failed to add survey instructions")
    
    # Process each image and description pair
    for i, (image, description) in enumerate(zip(images, task_descriptions)):
        question_id = f"q{i+1:03d}"
        
        print(f"Processing question {question_id}...")
        
        image_url = client.upload_image_to_library(image, f"image_{question_id}.png")

        # Create mistake detection question
        if client.create_mistake_detection_question(survey_id, image_url, description, question_id):
            print(f"✓ Created mistake detection question {question_id}")
        else:
            print(f"✗ Failed to create mistake detection question {question_id}")
            continue
        
        # Create confidence question
        if client.create_confidence_question(survey_id, question_id):
            print(f"✓ Created confidence question for {question_id}")
        else:
            print(f"✗ Failed to create confidence question for {question_id}")
    
    # Publish survey if requested
    if publish:
        survey_url = client.publish_survey(survey_id, f"Automated annotation survey: {survey_name}")
        if survey_url:
            print(f"\n🚀 Survey published and ready for use!")
            return survey_url
        else:
            print(f"\n⚠️ Survey created but publishing failed. Survey ID: {survey_id}")
            return survey_id
    else:
        print(f"\n📋 Survey created successfully. Survey ID: {survey_id}")
        print(f"Manual URL: https://{data_center}.qualtrics.com/jfe/form/{survey_id}")
        return survey_id

# Load dataset 

In [3]:
# Load approopriate evaluation dataset
dataset = None
for retry in range(5):
    print(f"Loading evaluation dataset (try {retry})...")
    try:
        dataset = Ego4DMistakeDetectionDataset(data_split="val", 
                                                mismatch_augmentation=True,
                                                multi_frame=False,
                                                debug_n_examples_per_class=250)
        break
    except Exception as e:
        print("Encountered error during data loading:")
        pprint(e)
        time.sleep(60)
if dataset is None:
    raise ValueError("Could not load dataset after retrying!")

Loading evaluation dataset (try 0)...


# Sample data

First reorganize data by mistake type (and success) to estimate distribution of labels.

In [4]:
from collections import defaultdict
from pprint import pprint

examples_by_mistake_type = defaultdict(list)

for example in dataset:
    examples_by_mistake_type[str(example.mistake_type)].append(example)

mistake_type_dist = {}
for mistake_type in examples_by_mistake_type:
    mistake_type_dist[mistake_type] = len(examples_by_mistake_type[mistake_type]) / len(dataset)

pprint(mistake_type_dist)

{'Action Incomplete': 0.102,
 'MisalignSRL_ARG1': 0.174,
 'MisalignSRL_V': 0.062,
 'MisalignSRL_V_ARG1': 0.162,
 'None': 0.5}


In [5]:
import random
from PIL import Image

TOTAL_EXAMPLES_TO_ANNOTATE = 100
N_EXAMPLES_PER_ANNOTATOR = 25
examples_to_annotate = {
    "Action Incomplete": 10,
    "MisalignSRL_ARG1": 18,
    "MisalignSRL_V": 6,
    "MisalignSRL_V_ARG1": 16,
    "None": 50,
}
assert TOTAL_EXAMPLES_TO_ANNOTATE == sum(list(examples_to_annotate.values())), "Configured distribution of annotated example types doesn't add up to total number of expected examples to annotate."

selected_examples = []
for k in examples_to_annotate:
    selected_examples += random.sample(examples_by_mistake_type[k], examples_to_annotate[k])

assert len(selected_examples) == TOTAL_EXAMPLES_TO_ANNOTATE, "Didn't sample expected number of examples to annotate."

random.shuffle(selected_examples)
assert TOTAL_EXAMPLES_TO_ANNOTATE % N_EXAMPLES_PER_ANNOTATOR == 0, "Number of examples per annotator must evenly divide total number of annotated examples."
sample_chunks = []
# Split into even chunks
n_chunks = TOTAL_EXAMPLES_TO_ANNOTATE // N_EXAMPLES_PER_ANNOTATOR
for chunk in range(n_chunks):
    examples = selected_examples[N_EXAMPLES_PER_ANNOTATOR*chunk:N_EXAMPLES_PER_ANNOTATOR*(chunk+1)]
    assert all(len(e.frames) == 1 for e in examples)

    descriptions = [e.procedure_description for e in examples]
    images = [e.frames[0] for e in examples]
    
    assert len(descriptions) == len(images) == N_EXAMPLES_PER_ANNOTATOR, f"Didn't get correct number of descriptions or images for chunk {chunk}."

    sample_chunks.append((images, descriptions))

assert len(sample_chunks) == n_chunks, "Didn't get the correct number of chunks."

# Generate Qualtrics forms for annotation

In [None]:
from travel.constants import QUALTRICS_API_TOKEN, QUALTRICS_DATA_CENTER, QUALTRICS_LIBRARY_NAME

for ci, (images, descriptions) in enumerate(sample_chunks):

    if ci == 1:
        break

    # Generate survey
    result = generate_annotation_survey(
        api_token=QUALTRICS_API_TOKEN,
        data_center=QUALTRICS_DATA_CENTER,
        library_name=QUALTRICS_LIBRARY_NAME,
        images=images,
        task_descriptions=descriptions,
        survey_name="Procedural Mistake Detection Human Annotation",
        publish=True,
    ) # NOTE: make sure questions are shuffled

    if result:
        if result.startswith('http'):
            print(f"\n✅ Survey {ci} created successfully!")
            print(f"🔗 Share this URL: {result}")
        else:
            print(f"\n📝 Survey ID: {result}")
    else:
        print("❌ Survey {ci} creation failed")

NameError: name 'sample_chunks' is not defined

In [None]:
from travel.constants import QUALTRICS_API_TOKEN, QUALTRICS_DATA_CENTER, QUALTRICS_LIBRARY_NAME


test_image = Image.new('RGB', (100, 100), color='red')
test_descriptions = ["Test procedure: Do something simple"]

result = generate_annotation_survey(
    api_token=QUALTRICS_API_TOKEN,
    data_center=QUALTRICS_DATA_CENTER,
    library_name=QUALTRICS_LIBRARY_NAME,
    images=[test_image],
    task_descriptions=test_descriptions,
    survey_name="Test Survey",
    publish=True
)

Created survey with ID: SV_5tYnm6Ld4dUdw0e
Processing question q001...
{'libraryId': 'UR_2sCnXQ8e2Seuvxc', 'libraryName': 'Shane Storks'}
✓ Created mistake detection question q001
✓ Created confidence question for q001

📋 Survey created successfully. Survey ID: SV_5tYnm6Ld4dUdw0e
Manual URL: https://umich.qualtrics.com/jfe/form/SV_5tYnm6Ld4dUdw0e
