This script:

1. Creates sequences of stimuli excluding unwanted repetitions of letters and locations
2. Statistically assesses if the sequence is good enough and deletes a sequence, if not
3. Creates versions of each sequence with different distributions of tasks (?)
4. Checks wether the tasks and answers are well distributed

In [1]:
import random
import matplotlib.pyplot as plt
plt.style.use('ggplot')
import numpy as np
import pandas as pd
import os
import ast
import collections
import statistics
import re
from collections import Counter


In [2]:
seq_path_to_save = '...\\sequences\\' #! SET YOUR PATH HERE

**CREATING SEQUENCES**

In [3]:
# Path to the CSV file containing ALL_GRIDS
csv_path = '...\\all_grids.csv' #! SET YOUR PATH HERE
df = pd.read_csv(csv_path, delimiter=';', index_col=0) 
df.head()

Unnamed: 0,grid,letters,locs,encoding_stim,incorr_locs,corr_letters,incorr_letters,rotated_inds,incorr_rot,abc_ordered,incorr_abc
0,"[array([['Л', 'Ж', 'П'],\n ['Т', 'Х', 'Х...","['Л', 'Ж', 'П', 'Т']","[0, 1, 2, 3]",grid_1331.png,"[5, 6, 7, 8]","{'Л': 0, 'Ж': 1, 'П': 2, 'Т': 3}","{'Л': [1, 2, 3], 'Ж': [0, 2, 3], 'П': [0, 1, 3...","[2, 5, 8, 1]","[0, 3, 6, 7]","{'Ж': 0, 'Л': 1, 'П': 2, 'Т': 3}","{'Ж': [1, 2], 'Л': [0, 2], 'П': [1, 3], 'Т': [..."
1,"[array([['Ж', 'Л', 'Т'],\n ['Х', 'Х', 'П...","['Ж', 'Л', 'Т', 'П']","[0, 1, 2, 5]",grid_1332.png,"[3, 6, 7, 8]","{'Ж': 0, 'Л': 1, 'Т': 2, 'П': 3}","{'Ж': [1, 2, 3], 'Л': [0, 2, 3], 'Т': [0, 1, 3...","[2, 5, 8, 7]","[0, 1, 3, 6]","{'Ж': 0, 'Л': 1, 'П': 2, 'Т': 3}","{'Ж': [1, 2], 'Л': [0, 2], 'П': [1, 3], 'Т': [..."
2,"[array([['П', 'Л', 'Ж'],\n ['Х', 'Х', 'Х...","['П', 'Л', 'Ж', 'Т']","[0, 1, 2, 6]",grid_1333.png,"[3, 5, 7, 8]","{'П': 0, 'Л': 1, 'Ж': 2, 'Т': 3}","{'П': [1, 2, 3], 'Л': [0, 2, 3], 'Ж': [0, 1, 3...","[2, 5, 8, 0]","[1, 3, 6, 7]","{'Ж': 0, 'Л': 1, 'П': 2, 'Т': 3}","{'Ж': [1, 2], 'Л': [0, 2], 'П': [1, 3], 'Т': [..."
3,"[array([['Ж', 'Л', 'Т'],\n ['Х', 'Х', 'Х...","['Ж', 'Л', 'Т', 'П']","[0, 1, 2, 7]",grid_1334.png,"[3, 5, 6, 8]","{'Ж': 0, 'Л': 1, 'Т': 2, 'П': 3}","{'Ж': [1, 2, 3], 'Л': [0, 2, 3], 'Т': [0, 1, 3...","[2, 5, 8, 3]","[0, 1, 6, 7]","{'Ж': 0, 'Л': 1, 'П': 2, 'Т': 3}","{'Ж': [1, 2], 'Л': [0, 2], 'П': [1, 3], 'Т': [..."
4,"[array([['Т', 'П', 'Ж'],\n ['Х', 'Х', 'Х...","['Т', 'П', 'Ж', 'Л']","[0, 1, 2, 8]",grid_1335.png,"[3, 5, 6, 7]","{'Т': 0, 'П': 1, 'Ж': 2, 'Л': 3}","{'Т': [1, 2, 3], 'П': [0, 2, 3], 'Ж': [0, 1, 3...","[2, 5, 8, 6]","[0, 1, 3, 7]","{'Ж': 0, 'Л': 1, 'П': 2, 'Т': 3}","{'Ж': [1, 2], 'Л': [0, 2], 'П': [1, 3], 'Т': [..."


In [None]:
# Rules for seqs:
# 1. Length - 300 rows (trials)
# 2. Less than 3 coinciding locations in two consequtive trials
# 3. No coinciding letters in two consequtive trials

info = []

# Loop for generating sequences
for i in range(0, 15): # Set desired number of sequences. NOTE: set higher number than you need as many seqs will be excluded due to the criteria below
    # Generate sequence name
    sequence_name = f'sequence_{i+1}'
    
    # Total number of rows in the DataFrame
    total_rows = df.shape[0]
    
    # Randomly shuffle DataFrame indices
    random_indices = random.sample(range(total_rows), total_rows)
    
    # Retrieve random lines from DataFrame
    random_lines = df.iloc[random_indices]
    
    # Initialize sequence with the first trial
    sequence = random_lines.iloc[[0]]
    
    # Initialize DataFrame to store bad trials that don't meet criteria
    bad_lines = pd.DataFrame(columns=sequence.columns)

    # Initialize previous trial with the first trial
    previous_trial = random_lines.iloc[[0]]

    # Loop through each trial in the shuffled DataFrame
    for i in range(total_rows - 1):

        # Get the current trial
        current_trial = random_lines.iloc[[i+1]]

        # Extract location information from current and previous trials
        curr_loc = ast.literal_eval(current_trial['locs'].iloc[0])
        prev_loc = ast.literal_eval(previous_trial['locs'].iloc[0])
        
        # Calculate unwanted coincidences in locations
        loc_coin = len(list(set(curr_loc) & set(prev_loc)))

        # Extract letter information from current and previous trials
        curr_let = ast.literal_eval(current_trial['letters'].iloc[0])
        prev_let = ast.literal_eval(previous_trial['letters'].iloc[0])
        
        # Calculate unwanted coincidences in letters
        lett_coin = len(list(set(curr_let) & set(prev_let)))

        # Check if the current trial meets the criteria - less than 3 coinciding locations & no coinciding letters
        if loc_coin < 3 and lett_coin == False:
            # Append current trial to the sequence and update previous trial
            sequence = sequence.append(current_trial)
            previous_trial = current_trial
        else:
            # Append current trial to bad lines DataFrame
            bad_lines = bad_lines.append(current_trial)

        # Check if the sequence length is less than or equal to 300
        if (i == (total_rows - 2)) and (len(sequence) <= 300):
            # Append length of sequence to info list
            info.append(len(sequence))
            
            # Update total_rows with the number of bad lines
            total_rows = bad_lines.shape[0]
            
            # Randomly shuffle indices of bad lines DataFrame
            random_indices = random.sample(range(total_rows), total_rows)
            
            # Retrieve random lines from bad lines DataFrame
            random_lines = bad_lines.iloc[random_indices]
            
            # Set previous trial as the last trial of the current sequence
            previous_trial = sequence.iloc[[-1]]

            # Loop through each trial in the shuffled bad lines DataFrame
            for i in range(total_rows - 1):
                # Get the current trial
                current_trial = random_lines.iloc[[i+1]]

                # Extract location information from current and previous trials
                curr_loc = ast.literal_eval(current_trial['locs'].iloc[0])
                prev_loc = ast.literal_eval(previous_trial['locs'].iloc[0])
                
                # Calculate unwanted coincidences in locations
                loc_coin = len(list(set(curr_loc) & set(prev_loc)))

                # Extract letter information from current and previous trials
                curr_let = ast.literal_eval(current_trial['letters'].iloc[0])
                prev_let = ast.literal_eval(previous_trial['letters'].iloc[0])
                
                # Calculate unwanted coincidences in letters
                lett_coin = len(list(set(curr_let) & set(prev_let)))

                # Check if the current trial meets the criteria
                if loc_coin < 3 and lett_coin == False:
                    # Append current trial to the sequence and update previous trial
                    sequence = sequence.append(current_trial)
                    previous_trial = current_trial

                # Check if the sequence length is 300 and there are no duplicates
                if (len(sequence) == 300) and (sequence.duplicated().any() == False):
                    # Save the first 300 trials of the sequence to a CSV file
                    sequence[:300].to_csv(f'{seq_path_to_save}{sequence_name}.csv', sep=';', index=True, encoding='utf-8-sig')
                    break

# Print the original lengths of the sequences
print(f'Original lengths of sequences were: {info}')



**SEQUENCE STATISTICS**

In [3]:
def good_sequence(letter_counter: collections.Counter, locs_counter: collections.Counter) -> bool:
    # Initialize a flag to indicate whether the sequence is good
    good_seq = True

    # Calculate mean and standard deviation of letter counts and location counts
    letters_mean = statistics.mean(letter_counter.values())
    letters_std = statistics.stdev(letter_counter.values())
    locs_mean = statistics.mean(locs_counter.values())
    locs_std = statistics.stdev(locs_counter.values())

    # Check if any letter count deviates significantly from the mean
    for i in letter_counter.values():
        if abs(i - letters_mean) >= 2 * letters_std:
            # If deviation is larger that 2*STD, mark the sequence as not good and break the loop
            good_seq = False
            break

    # Check if any location count deviates significantly from the mean
    for i in locs_counter.values():
        if abs(i - locs_mean) >= 2 * locs_std:
            # If deviation is larger that 2*STD, mark the sequence as not good and break the loop
            good_seq = False
            break

    # Return the flag indicating whether the sequence is good
    return good_seq

In [9]:
# Get a list of sequence files in the specified path
sequence_files = os.listdir(seq_path_to_save)[:-1]

# Iterate over each sequence file
for file_name in sequence_files:
    # Read the sequence CSV file into a DataFrame
    seq_df = pd.read_csv(os.path.join(seq_path_to_save, file_name), delimiter=';', index_col=0)

    # Initialize arrays to store letters and locations
    letters_array = np.array([])
    locs_array = np.array([])

    # Extract letters and locations from each grid in the sequence
    for grid in range(seq_df.shape[0]):
        letters = ast.literal_eval(seq_df['letters'].iloc[grid])
        letters_array = np.append(letters_array, [i for i in letters])

        locs = ast.literal_eval(seq_df['locs'].iloc[grid])
        locs_array = np.append(locs_array, [j for j in locs])

    # Count occurrences of each letter and location
    letter_counter = collections.Counter(letters_array)
    locs_counter = collections.Counter(locs_array)
    locs_counter = collections.Counter({int(k): v for k, v in locs_counter.items()})

    # Check if the sequence is good based on sequence stats
    good_seq = good_sequence(letter_counter, locs_counter)

    # If the sequence is good, print its length and save statistics plots
    if good_seq == True:
        print(f'Length of {file_name[:-4]}: {len(seq_df)}')
        plt.bar(letter_counter.keys(), letter_counter.values())
        plt.savefig(f'{seq_path_to_save}stats\{file_name[:-4]}_letters.png', bbox_inches='tight', pad_inches=0, dpi=300)
        plt.close()
        plt.bar(locs_counter.keys(), locs_counter.values())
        plt.savefig(f'{seq_path_to_save}stats\{file_name[:-4]}_locs.png', bbox_inches='tight', pad_inches=0, dpi=300)
        plt.close()
    # If the sequence is not good, print a message and remove the sequence file from the folder
    else:
        print(f'{file_name[:-4]} is a bad sequence!')
        os.remove(os.path.join(seq_path_to_save, file_name))


Length of sequence_1: 300
Length of sequence_10: 300
Length of sequence_11: 300
Length of sequence_12: 300
sequence_13 is a bad sequence!
sequence_14 is a bad sequence!
Length of sequence_15: 300
Length of sequence_16: 300
sequence_18 is a bad sequence!
sequence_19 is a bad sequence!
Length of sequence_2: 300
sequence_21 is a bad sequence!
Length of sequence_22: 300
Length of sequence_23: 300
Length of sequence_25: 300
sequence_26 is a bad sequence!
Length of sequence_27: 300
sequence_28 is a bad sequence!
sequence_29 is a bad sequence!
sequence_30 is a bad sequence!
sequence_31 is a bad sequence!
sequence_33 is a bad sequence!
Length of sequence_34: 300
sequence_36 is a bad sequence!
Length of sequence_37: 300
Length of sequence_38: 300
Length of sequence_40: 300
Length of sequence_41: 300
Length of sequence_42: 300
Length of sequence_43: 300
Length of sequence_44: 300
sequence_46 is a bad sequence!
sequence_48 is a bad sequence!
sequence_49 is a bad sequence!
Length of sequence_50: 3

**TASKS FOR EACH SEQUENCE**

Indexing of tasks:

'perc': 0

'pr_loc': 1

'pr_let': 2

'pr_rot': 3

'pr_abc': 4

'corr': np.nan - if perception, 0 - incorrect, 1 - correct

In [4]:
def set_task_type(
    seq_df: pd.DataFrame,
    num_rows: int, integers: list[int],
    percentage_per_integer: float
            ) -> pd.DataFrame:

    # Calculate the number of rows and 20% of the total rows
    num_per_integer = int(num_rows * percentage_per_integer)
    # Create and shuffle a list with integers 1 to 5 repeated 20% of the total rows
    tasks_list = integers * num_per_integer
    # Shuffle the tasks list to randomize the distribution of integers
    random.shuffle(tasks_list)

    return tasks_list

In [5]:
def rand_zero_ones(num_rows: int) -> list[int]:
    # Calculate the number of 0 and 1 values needed based on the total number of rows
    num_zero_ones = num_rows // 10  # Since there are 5 types of tasks, half correct (1), half incorrect (0)

    # Create a list with alternating zero and one values
    zero_ones = [0, 1] * num_zero_ones

    # Shuffle the list to randomize the distribution of zero and one values
    random.shuffle(zero_ones)

    return zero_ones

In [6]:
# Generate correctness values for trials based on their task indices
def set_correctness(num_rows: int, seq_df: pd.DataFrame) -> pd.DataFrame:
    # Initialize a list to store the correctness of each trial (correct or incorrect)
    answer_type = []

    # Initialize lists for different types of tasks
    zo_1, zo_2, zo_3, zo_4 = [], [], [], []

    # Populate the lists with zero and one values representing correctness (0 for incorrect, 1 for correct)
    for pattern in zo_1, zo_2, zo_3, zo_4:
        pattern.extend(rand_zero_ones(num_rows))

    # Assign correctness values to trials based on their task indices
    for i in range(num_rows):
        if seq_df['task_ind'].iloc[i] == 0:
            # For trials with task index 0, mark the correctness as NaN
            answer_type.append(np.nan)
        else:
            # For other task indices, assign correctness values based on the corresponding list
            if seq_df['task_ind'].iloc[i] == 1:
                answer_type.append(zo_1[0])
                zo_1.pop(0)
            elif seq_df['task_ind'].iloc[i] == 2:
                answer_type.append(zo_2[0])
                zo_2.pop(0)
            elif seq_df['task_ind'].iloc[i] == 3:
                answer_type.append(zo_3[0])
                zo_3.pop(0)
            else:
                answer_type.append(zo_4[0])
                zo_4.pop(0)
    
    # Convert NaN values to None and cast other values to integers
    answer_type_int = [int(i) if not np.isnan(i) else i for i in answer_type]

    # Return the list of correctness values
    return answer_type_int

In [17]:
def set_task(num_rows: int, seq_df: pd.DataFrame) -> pd.DataFrame:

    # Create probe tasks for each trial
    task = []

    for i in range(num_rows):

        if seq_df['task_ind'].iloc[i] == 0: # perception
            task.append(np.nan)

        elif seq_df['task_ind'].iloc[i] == 1: # location
            if seq_df['answer_type'].iloc[i] == 0: # incorrect
                good_locs = [i for i in ast.literal_eval(seq_df['incorr_locs'].iloc[i]) if i != 4]
                loc_to_present = random.choice(good_locs)
                task.append(loc_to_present)

            elif seq_df['answer_type'].iloc[i] == 1: # correct
                good_locs = [i for i in ast.literal_eval(seq_df['locs'].iloc[i]) if i != 4]
                loc_to_present = random.choice(good_locs)
                task.append(loc_to_present)

        elif seq_df['task_ind'].iloc[i] == 2: # letter
            if seq_df['answer_type'].iloc[i] == 0: # incorrect
                pattern = ['-'] * 4
                output_string = []
                lett_incorr = ast.literal_eval(seq_df['incorr_letters'].iloc[i])

                # Randomly pick a letter from the dictionary
                picked_letter = random.choice(list(lett_incorr.keys()))
                # Randomly select one of the incorrect positions for the picked letter
                incorrect_position = random.choice(lett_incorr[picked_letter])
                # Update the pattern with the picked letter in the incorrect position
                pattern[incorrect_position] = picked_letter
                # Convert the pattern list to a single string with incorrect position only
                output_string = ' '.join(pattern)
                task.append(f"'{output_string}")

            elif seq_df['answer_type'].iloc[i] == 1: # correct
                lett_corr = ast.literal_eval(seq_df['corr_letters'].iloc[i])
                pattern = ['-'] * 4
                output_string = []

                # Randomly pick a letter from the dictionary
                picked_letter = random.choice(list(lett_corr.keys()))
                # Retrieve the correct position for the picked letter
                correct_position = lett_corr[picked_letter]
                # Update the pattern with the picked letter in the correct position
                pattern[correct_position] = picked_letter
                output_string = ' '.join(pattern)
                task.append(f"'{output_string}")

        elif seq_df['task_ind'].iloc[i] == 3: # rotation
            if seq_df['answer_type'].iloc[i] == 0: # incorrect
                good_locs = [i for i in ast.literal_eval(seq_df['incorr_rot'].iloc[i]) if i != 4]
                loc_to_present = random.choice(good_locs)
                task.append(loc_to_present)

            elif seq_df['answer_type'].iloc[i] == 1: # correct
                good_locs = [i for i in ast.literal_eval(seq_df['rotated_inds'].iloc[i]) if i != 4]
                loc_to_present = random.choice(good_locs)
                task.append(loc_to_present)

        elif seq_df['task_ind'].iloc[i] == 4: # alphabet
            if seq_df['answer_type'].iloc[i] == 0: # incorrect
                pattern = ['-'] * 4
                output_string = []
                abc_incorr = ast.literal_eval(seq_df['incorr_abc'].iloc[i])

                # Randomly pick a letter from the dictionary
                picked_letter = random.choice(list(abc_incorr.keys()))
                # Randomly select one of the incorrect positions for the picked letter
                incorrect_position = random.choice(abc_incorr[picked_letter])
                # Update the pattern with the picked letter in the incorrect position
                pattern[incorrect_position] = picked_letter
                # Convert the pattern list to a single string with incorrect position only
                output_string = ' '.join(pattern)
                task.append(f"'{output_string}")

            elif seq_df['answer_type'].iloc[i] == 1: # correct
                abc_corr = ast.literal_eval(seq_df['abc_ordered'].iloc[i])
                # Define a list of letters to drop
                letters_to_drop = ['Г', 'Д', 'Ф', 'Ш']
                # Remove the letters to drop from the dictionary
                for letter in letters_to_drop:
                    if letter in abc_corr:
                        if abc_corr[letter] == 0 or abc_corr[letter] == 3:
                            del abc_corr[letter]

                # Check if there are remaining letters in the dictionary
                pattern = ['-'] * 4
                output_string = []
                # Randomly pick a letter from the updated dictionary
                picked_letter = random.choice(list(abc_corr.keys()))
                # Retrieve the correct position for the picked letter
                correct_position = abc_corr[picked_letter]
                # Update the pattern with the picked letter in the correct position
                pattern[correct_position] = picked_letter
                output_string = ' '.join(pattern)
                task.append(f"'{output_string}")

    return task

In [202]:
sequence_files = os.listdir(seq_path_to_save)[:-1]

for file_name in sequence_files:

    seq_path = os.path.join(seq_path_to_save, file_name)
    seq_df = pd.read_csv(seq_path, delimiter=';', index_col=0)

    # Get the number of rows in the DataFrame
    num_rows = len(seq_df)
    # Define parameters for setting task type
    integers = [0, 1, 2, 3, 4]  # Number of integers (0 to 4)
    percentage_per_integer = 0.2

    # Set task type for each trial in the sequence
    seq_df['task_ind'] = set_task_type(seq_df, num_rows, integers, percentage_per_integer)
    # Set correctness for each trial (50/50)
    seq_df['answer_type'] = set_correctness(num_rows, seq_df)
    # Set particular probe for each trial
    seq_df['probe_stim'] = set_task(num_rows, seq_df)

    # Filter trials by task types
    # location trials
    locs = seq_df[seq_df.task_ind.isin([1])] # simple
    rots = seq_df[seq_df.task_ind.isin([3])] # complex

    # letter trials
    letts = seq_df[seq_df.task_ind.isin([2])] # simple
    abcs = seq_df[seq_df.task_ind.isin([4])] # complex

    task_types = [locs, rots, letts]

    # Check if any probe appears more than half the time in a trial type
    for task_type in task_types:
        for probe in task_type['probe_stim'].value_counts():
            if probe > len(task_type) // 2:
                task_ind = task_type['task_ind'].iloc[0]
                task_goodness = f'Value {probe} in trial type {task_ind} is out of range!'
                print(file_name[:-4], ':', task_goodness)

    # Check if any alphabet probe exceeds half the threshold
    lett_list = []
    loc_list = []
    for string in abcs['probe_stim']:
        string = string.replace(' ', '')
        match = re.search('[\u0400-\u04FF]', string)
        char = match.group()
        position = string.index(char)
        lett_list.append(char)
        loc_list.append(position)

        lett_counter = Counter(lett_list)
        loc_counter = Counter(loc_list)

        threshold = len(abcs) // 2
        exceeds_threshold = any(value > threshold for value in lett_counter.values())
        if exceeds_threshold == True:
            abc_goodness = f'Value {string} in trial type 4 is out of range!'
            print(file_name[:-4], ':', abc_goodness)

    # Save the modified DataFrame back to the CSV file
    seq_df.to_csv(seq_path, sep=';', index=True, encoding='utf-8-sig')


**LAST CHECK OF RESULTING SEQS: REPETITION OF TRIALS**

In [51]:
seqs_path = '...\\sequences\\' #! SET YOUR PATH HERE
sequence_files = os.listdir(seqs_path)[:-1]


In [203]:
possible_inds = [1, 2, 3, 4]

sequence_files = os.listdir(seqs_path)[:-1]

for file_name in sequence_files:
    seq_path = os.path.join(seqs_path, file_name)
    seq_df = pd.read_csv(seq_path, delimiter=';', index_col=0)
    task_indices = seq_df['task_ind'].tolist()

    # Initialize counters for the two rules
    task_theshold = 0
    modality_threshold = 0
    last_index = None
    last_modality = None

    # Define your rules
    max_task_theshold = 3
    max_modality_threshold = 5
    rows_counter = 1

    # Set a flag for good sequence
    good_seq = True

    # Iterate through the list of task indices
    for index in task_indices:
        rows_counter += 1

        # Check rule 1: No more than 3 tasks in a row with the same index
        if index == last_index:
            task_theshold += 1
        else:
            task_theshold = 1
            last_index = index

        if task_theshold > max_task_theshold:
            good_seq = False
            print(f"Line {rows_counter}: Rule 1 Violation")


    # Check rule 2: No more than 5 tasks in a row of the same modality
        if index in possible_inds:
            modality = index % 2  # Assuming 1 and 3 are visual and 2 and 4 are verbal
            if modality == last_modality:
                modality_threshold += 1
            else:
                modality_threshold = 1
                last_modality = modality

            if modality_threshold > max_modality_threshold:
                good_seq = False
                print(f"Line {rows_counter}: Rule 2 Violation")

        else:
            modality_threshold = 1
            last_modality = None

    # Check if both rules are satisfied
    if good_seq == True:
        print(f"{file_name} is GOOD!")
    else:
        print(f'{file_name[:-4]} is a bad sequence! It\'ll be REMOVED')
        os.remove(os.path.join(seqs_path, file_name))


*NOTE:* if you are not satisfied with the resulting number of sequences, repeat the generation and exclusion procedures once again.

(!) But do not forget to reset range(0, 15) in the cell that generates sequences to avoid overwriting.

**THE END**