In [1]:
import numpy as np
import pandas as pd

# Constant

In [2]:
seq_folder = "/Users/ccnlab/Development/sequences/shaping/v2/"

ALL_DIRECTIONS = ["top", "bottom", "left", "right"]
OUTPUT_COL_ORDER = [
    "stim",
    "correct_key",
    "block",
    "img_folder",
    "top_img",
    "bottom_img",
    "left_img",
    "right_img",
    "top_key",
    "bottom_key",
    "left_key",
    "right_key",
]

# Helpers

In [3]:
def generate_sequence(num_stims, num_iter_per_stim):
    """
    Generate a sequence where each stimulus appears exactly num_iter_per_stim times
    and consecutive stimuli are different.

    Args:
        num_stims: Number of different stimuli (0 to num_stims-1)
        num_iter_per_stim: Number of times each stimulus should appear

    Returns:
        List of stimuli satisfying the constraints
    """
    # Track remaining count for each stimulus
    remaining_counts = [num_iter_per_stim] * num_stims
    sequence = []

    # Start with a random stimulus
    current_stim = np.random.choice(num_stims)
    sequence.append(current_stim)
    remaining_counts[current_stim] -= 1

    while len(sequence) < num_stims * num_iter_per_stim:
        # Get available stimuli (not the last one and still have remaining count)
        available_stims = [
            stim
            for stim in range(num_stims)
            if stim != sequence[-1] and remaining_counts[stim] > 0
        ]

        if not available_stims:
            # If no valid options, we need to backtrack or use a different strategy
            # This should rarely happen with proper parameters
            raise ValueError("Cannot generate valid sequence with given constraints")

        # Choose next stimulus with preference for those with higher remaining counts
        # This helps balance the sequence and avoid getting stuck
        weights = [remaining_counts[stim] for stim in available_stims]
        next_stim = np.random.choice(
            available_stims, p=np.array(weights) / sum(weights)
        )

        sequence.append(next_stim)
        remaining_counts[next_stim] -= 1

    return sequence


def generate_sequence_optimized(num_stims, num_iter_per_stim, max_attempts=100):
    """
    More robust version that can handle edge cases by trying multiple times.
    """
    for attempt in range(max_attempts):
        try:
            return generate_sequence(num_stims, num_iter_per_stim)
        except ValueError:
            continue

    # Fallback: use a more deterministic approach
    return generate_sequence_deterministic(num_stims, num_iter_per_stim)


def generate_sequence_deterministic(num_stims, num_iter_per_stim):
    """
    Deterministic approach that guarantees a valid sequence.
    Creates blocks of stimuli and then shuffles while maintaining constraints.
    """
    # Create base sequence with all stimuli
    base_sequence = []
    for stim in range(num_stims):
        base_sequence.extend([stim] * num_iter_per_stim)

    # Shuffle while maintaining no-consecutive constraint
    sequence = [base_sequence[0]]
    remaining = base_sequence[1:]

    while remaining:
        # Find valid next stimuli
        valid_indices = [i for i, stim in enumerate(remaining) if stim != sequence[-1]]

        if not valid_indices:
            # If stuck, swap with a later element
            for i in range(len(remaining)):
                if remaining[i] != sequence[-1]:
                    # Move this element to a random valid position
                    valid_pos = np.random.choice(
                        [
                            j
                            for j in range(len(remaining))
                            if remaining[j] != remaining[i]
                        ]
                    )
                    remaining[i], remaining[valid_pos] = (
                        remaining[valid_pos],
                        remaining[i],
                    )
                    break
            valid_indices = [
                i for i, stim in enumerate(remaining) if stim != sequence[-1]
            ]

        # Choose random valid option
        chosen_idx = np.random.choice(valid_indices)
        sequence.append(remaining[chosen_idx])
        remaining.pop(chosen_idx)

    return sequence


def generate_food_map(num_food, num_trials):
    base = [i for i in range(num_food)]
    food_map = []
    for i in range(num_trials):
        current_map = base.copy()
        np.random.shuffle(current_map)
        food_map.append(current_map)
        base = current_map

    return food_map


def generate_shaping_block(num_directions, num_iter_per_stim, correct_key_mapping):
    seq_data = {
        "stim": [],
        "correct_key": [],
        "top_img": [],
        "bottom_img": [],
        "left_img": [],
        "right_img": [],
    }
    sequence = generate_sequence_optimized(num_directions, num_iter_per_stim)
    food_map = generate_food_map(num_directions, len(sequence))
    for i, seq in enumerate(sequence):
        seq_data["stim"].append(food_map[i][seq])
        seq_data["correct_key"].append(correct_key_mapping[ALL_DIRECTIONS[seq]])
        for j, food in enumerate(food_map[i]):
            seq_data[f"{ALL_DIRECTIONS[j]}_img"].append(food)

    for k, v in correct_key_mapping.items():
        seq_data[f"{k}_key"] = [v] * len(sequence)

    return seq_data


def generate_top_shaping_block(num_iter_per_stim, stim_food_mapping):
    seq_data = {
        "stim": [],
        "correct_key": [],
        "top_img": [],
        "bottom_img": [],
        "left_img": [],
        "right_img": [],
    }
    num_stim = len(stim_food_mapping)
    correct_key_seq = generate_sequence_optimized(num_stim, num_iter_per_stim)
    stim_seq = generate_sequence_optimized(num_stim, num_iter_per_stim)
    seq_data["stim"] = stim_seq

    for i, stim in enumerate(stim_seq):
        stim_food = stim_food_mapping[stim]
        stim_food_pos = correct_key_seq[i]
        # put the food in the correct position
        base = swap_by_indices(np.array([0, 1, 2, 3]), stim_food, stim_food_pos)
        # shuffle the food map except the correct position
        food_map = shuffle_with_mask(
            base, np.array([i == stim_food_pos for i in range(num_stim)])
        )
        for j, food in enumerate(food_map):
            seq_data[f"{ALL_DIRECTIONS[j]}_img"].append(food)
        seq_data["correct_key"].append(stim_food_pos)

    for k, v in zip(ALL_DIRECTIONS, [0, 1, 2, 3]):
        seq_data[f"{k}_key"] = [v] * len(correct_key_seq)

    return seq_data


def generate_non_shaping_block(
    num_directions, num_iter_per_stim, correct_key_mapping, stim_food_mapping
):
    seq_data = {
        "stim": [],
        "correct_key": [],
        "top_img": [],
        "bottom_img": [],
        "left_img": [],
        "right_img": [],
    }
    correct_dir_seq = generate_sequence_optimized(num_directions, num_iter_per_stim)
    stim_seq = generate_sequence_optimized(num_directions, num_iter_per_stim)
    seq_data["stim"] = stim_seq
    all_food_indexes = np.array([0, 1, 2, 3])
    for i, dir in enumerate(correct_dir_seq):
        correct_key = correct_key_mapping[ALL_DIRECTIONS[dir]]
        stim_food_pos = dir
        stim_food = stim_food_mapping[stim_seq[i]]
        base = swap_by_indices(all_food_indexes, stim_food, stim_food_pos)
        food_map = shuffle_with_mask(
            base, np.array([i == stim_food_pos for i in range(num_directions)])
        )
        for j, food in enumerate(food_map):
            seq_data[f"{ALL_DIRECTIONS[j]}_img"].append(food)
        seq_data["correct_key"].append(correct_key)

    for k, v in correct_key_mapping.items():
        seq_data[f"{k}_key"] = [v] * len(correct_dir_seq)

    return seq_data


def shuffle_with_mask(arr, mask):
    """
    Shuffle array where mask[i] = True means keep arr[i] unchanged
    mask[i] = False means this element can be shuffled
    """
    arr = arr.copy()

    # Get indices that can be shuffled
    shuffleable_indices = np.where(~mask)[0]

    if len(shuffleable_indices) <= 1:
        return arr  # Nothing to shuffle

    # Extract values at shuffleable positions
    shuffleable_values = arr[shuffleable_indices]
    np.random.shuffle(shuffleable_values)

    # Put shuffled values back
    arr[shuffleable_indices] = shuffleable_values

    return arr


def swap_by_indices(arr, target_value, target_index):
    """
    Swap elements at two specific indices
    """
    arr = arr.copy()
    curr_index = np.where(arr == target_value)[0][0]
    arr[curr_index], arr[target_index] = arr[target_index], arr[curr_index]
    return arr


def generate_key_food_mapping(num_directions, num_food):
    base_array = np.random.permutation(num_directions)
    correct_key_mapping = {}
    for i, k in enumerate(base_array):
        correct_key_mapping[ALL_DIRECTIONS[i]] = k

    base_array = np.random.permutation(num_food)
    stim_food_mapping = {i: f for i, f in enumerate(base_array)}
    return correct_key_mapping, stim_food_mapping

In [4]:
from itertools import combinations

# Generate all combinations of 2 items from [0, 1, 2, 3]
NUM_IMG_FOLDERS = 4
items = np.arange(NUM_IMG_FOLDERS)
combos = list(combinations(items, 2))
print(combos)
# Output: [(0, 1), (0, 2), (0, 3), (1, 2), (1, 3), (2, 3)]

# Convert to lists if needed
combos_as_lists = [list(combo) for combo in combos]
print(combos_as_lists)
# Output: [[0, 1], [0, 2], [0, 3], [1, 2], [1, 3], [2, 3]]

[(0, 1), (0, 2), (0, 3), (1, 2), (1, 3), (2, 3)]
[[0, 1], [0, 2], [0, 3], [1, 2], [1, 3], [2, 3]]


# Shaping sequence

In [6]:
def generate_shaping_round(
    num_directions, iterations, correct_key_mapping, _, img_set, bs, last_nonshaping_block
):
    shaping_block = pd.DataFrame(
        generate_shaping_block(num_directions, iterations[0], correct_key_mapping)
    )
    nonshaping_block = last_nonshaping_block.copy()
    shaping_block["block"] = bs * 2
    shaping_block["img_folder"] = img_set + 1
    nonshaping_block["block"] = bs * 2 + 1
    nonshaping_block["img_folder"] = img_set + 1
    return pd.concat([shaping_block, nonshaping_block])

def generate_shaping_round_v2(
    num_directions, iterations, correct_key_mapping, stim_food_mapping, img_set, bs, last_nonshaping_block
):
    top_shaping_block = pd.DataFrame(
        generate_top_shaping_block(iterations[0], stim_food_mapping)
    )    
    shaping_block = pd.DataFrame(
        generate_shaping_block(num_directions, iterations[1], correct_key_mapping)
    )
    nonshaping_block = last_nonshaping_block.copy()
    top_shaping_block["block"] = bs * 3
    top_shaping_block["img_folder"] = img_set + 1
    shaping_block["block"] = bs * 3 + 1
    shaping_block["img_folder"] = img_set + 1
    nonshaping_block["block"] = bs * 3 + 2
    nonshaping_block["img_folder"] = img_set + 1
    return pd.concat([top_shaping_block, shaping_block, nonshaping_block])


def generate_nonshaping_round(
    num_directions, iterations, correct_key_mapping, stim_food_mapping, img_set, bs, last_nonshaping_block
):
    all_blocks = []
    last_block_num = 0
    for b in range(len(iterations)):
        nonshaping_block = pd.DataFrame(
            generate_non_shaping_block(
                num_directions, iterations[b], correct_key_mapping, stim_food_mapping
            )
        )

        last_block_num = bs * len(iterations) + b
        nonshaping_block["block"] = last_block_num
        nonshaping_block["img_folder"] = img_set + 1
        all_blocks.append(nonshaping_block)
    
    nonshaping_block = last_nonshaping_block.copy()
    nonshaping_block["block"] = last_block_num+1
    nonshaping_block["img_folder"] = img_set + 1
    all_blocks.append(nonshaping_block)
    
    return pd.concat(all_blocks)


NUM_DIRECTIONS = 4
NUM_FOOD = 4
version = "v2"
NUM_ITER_PER_STIM = 13
if version == "v1":
    iterations = [6]
    shaping_func = generate_shaping_round
elif version == "v2":
    iterations = [4, 4]
    shaping_func = generate_shaping_round_v2

# potential image sets [0, 1], [2, 3], [0,2], [1,3], [0,3], [1,2]
for seq_idx, set_comb in enumerate(combos_as_lists):
    shaping_blocks = []
    nonshaping_blocks = []
    for bs, img_set in enumerate(set_comb):
        correct_key_mapping, stim_food_mapping = generate_key_food_mapping(
            NUM_DIRECTIONS, NUM_FOOD
        )
        last_nonshaping_block = pd.DataFrame(
            generate_non_shaping_block(
                NUM_DIRECTIONS, NUM_ITER_PER_STIM, correct_key_mapping, stim_food_mapping
            )
        )
        shaping_blocks.append(
            shaping_func(
                NUM_DIRECTIONS,
                iterations,
                correct_key_mapping,
                stim_food_mapping,
                img_set,
                bs,
                last_nonshaping_block,
            )
        )
        nonshaping_blocks.append(
            generate_nonshaping_round(
                NUM_DIRECTIONS,
                iterations,
                correct_key_mapping,
                stim_food_mapping,
                img_set,
                bs,
                last_nonshaping_block,
            )
        )

    for name, data in zip(
        ["shaping", "nonshaping"], [shaping_blocks, nonshaping_blocks]
    ):
        concated_data = pd.concat(data)
        concated_data["block"] = concated_data["block"] + 1
        concated_data = concated_data[OUTPUT_COL_ORDER]
        concated_data.to_csv(f"{seq_folder}/{name}{seq_idx}_learning.csv", index=False)

concated_data.head(4)

Unnamed: 0,stim,correct_key,block,img_folder,top_img,bottom_img,left_img,right_img,top_key,bottom_key,left_key,right_key
0,2,3,1,3,0,1,3,2,0,3,1,2
1,0,2,1,3,1,3,0,2,0,3,1,2
2,1,3,1,3,2,0,3,1,0,3,1,2
3,2,2,1,3,2,3,0,1,0,3,1,2
