In [6]:
import os
import random
import pygame
import time
import csv

# params
positive_folder = "./data/positive"
negative_folder = "./data/negative"
num_trials = 10  
phase_times = [1, 1, 2]  # image-shown time(3 phases)
response_time_limit = 2  # decision time(limit)
results_file = "experiment_data.csv"

# load image 
def load_images(folder):
    images = [os.path.join(folder, f) for f in os.listdir(folder) if f.endswith(".jpg")]
    return [image.replace("\\", "/") for image in images]

positive_images = load_images(positive_folder)
negative_images = load_images(negative_folder)

# coherence dict (init: 0)
positive_coherences = {img: 0 for img in positive_images}
negative_coherences = {img: 0 for img in negative_images}

# balance #positve-img / #negative-img
num_positive = num_trials // 2
num_negative = num_trials - num_positive
selected_images = random.sample(positive_images, num_positive) + \
                  random.sample(negative_images, num_negative)
random.shuffle(selected_images)

# load coherence from .csv
def load_coherence_values(file_path):
    coherences = {}
    with open(file_path, "r") as f:
        reader = csv.reader(f)
        next(reader)  # skip header
        for row in reader:
            image_path, coherence = row
            coherences[image_path] = float(coherence)
    return coherences

positive_coherences = load_coherence_values("./coherences_positive.csv")
negative_coherences = load_coherence_values("./coherences_negative.csv")

# init pygame params
pygame.init()
screen = pygame.display.set_mode((800, 600))
pygame.display.set_caption("Emotion Decision (press 1/0 for positive/negative)")
font = pygame.font.SysFont("Arial", 32)
clock = pygame.time.Clock()

# useful funs
def show_text(screen, text, duration):
    """show a text"""
    screen.fill((0, 0, 0))
    text_surface = font.render(text, True, (255, 255, 255))
    rect = text_surface.get_rect(center=(400, 300))
    screen.blit(text_surface, rect)
    pygame.display.flip()
    pygame.time.delay(int(duration * 1000))

def check_keyboard_response(valid_keys, start_time):
    """
    detect keyboard responce
    - valid_keys: 1 / 0
    - start_time: for RT computing
    return: (key_res, RT) or (none, none)  
    """
    for event in pygame.event.get():
        if event.type == pygame.QUIT:
            pygame.quit()
            exit()
        elif event.type == pygame.KEYDOWN:
            if event.key in valid_keys:
                decision = "1" if event.key == pygame.K_1 else "0"
                rt = (time.time() - start_time)  # second
                return decision, rt
    return None, None

# experiment steps
results = []
show_text(screen, "WELCOME!", 2)

for trial_idx, img_path in enumerate(selected_images):
    label = "positive" if "positive" in img_path else "negative"
    coherence = positive_coherences[img_path] if label == "positive" else negative_coherences[img_path]
    img = pygame.image.load(img_path).convert()
    img = pygame.transform.scale(img, (400, 400))  # reshape

    decision = None
    rt = None
    response_captured = False

    for phase, phase_time in enumerate(phase_times):
        start_time = time.time()

        while time.time() - start_time < phase_time:
            screen.fill((0, 0, 0))

            if phase == 0:  # show upper part
                upper_part = img.subsurface(pygame.Rect(0, 0, 400, 200))
                cropped_upper = pygame.transform.scale(upper_part, (300, 110))  
                screen.blit(cropped_upper, (220, 210))
            elif phase == 1:  # show lower part
                lower_part = img.subsurface(pygame.Rect(0, 200, 400, 200))
                cropped_lower = pygame.transform.scale(lower_part, (250, 130)) 
                screen.blit(cropped_lower, (220, 210))
            else:  # whole img
                screen.blit(img, (200, 100))

            pygame.display.flip()

            # keyboard event
            decision, rt = check_keyboard_response([pygame.K_1, pygame.K_0], start_time)
            if decision is not None:  # skip to next trial
                response_captured = True
                break

        if response_captured:
            break

    # limit decision time
    if not response_captured:
        start_time = time.time()
        while time.time() - start_time < response_time_limit:
            decision, rt = check_keyboard_response([pygame.K_1, pygame.K_0], start_time)
            if decision is not None:
                break

    correct = (label == "positive" and decision == "1") or (label == "negative" and decision == "0")
    results.append([trial_idx + 1, img_path, label, coherence, rt, decision, correct])

    # exp interval
    show_text(screen, "+", 0.5)

# save results
file_exists = os.path.exists(results_file)
with open(results_file, "a", newline="", encoding="utf-8") as f:
    writer = csv.writer(f)
    if not file_exists:  # write the header row
        writer.writerow(["trial_id", "img_path", "img_label", "Coherence", "RT", "decision", "correct"])  # RT(s)
    writer.writerows(results)

# end of exp (show results)
correct_trials = sum(1 for r in results if r[-1])
avg_rt = sum(r[4] for r in results if r[4] is not None) / sum(1 for r in results if r[4] is not None)
show_text(screen, f"Experiment over! Accuracy: {correct_trials / len(results):.2%} Mean RT: {avg_rt:.2f}s", 3)

pygame.quit()
