In [1]:
import os
import json
import random
import shutil
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from post_repp_utils import combine_tasks_to_csv

### Save result as CSV

In [2]:
base_pkl_dir = r"output\italy-group1-aws"
sub_dirs = ["Task 1", "Task 2"]

combine_tasks_to_csv(base_pkl_dir, sub_dirs, "all_tasks_combined.csv")

Combined CSV saved: all_tasks_combined.csv


### Load csv

In [3]:
# output_exp_dir = r"output\mali-group1-final"
output_exp_dir = base_pkl_dir

csv_file = os.path.join(output_exp_dir, "all_tasks_combined.csv")        # TapTrialMusic.csv
final_df = pd.read_csv(csv_file)

df_passed_trials = final_df[final_df['stats.failed'] == False].copy()
df_failed_trials = final_df[final_df['stats.failed'] == True].copy()

print("Total trials:", final_df.shape[0])
print("Number of passed trials:", len(df_passed_trials))
print("Number of failed trials:", len(df_failed_trials))


Total trials: 360
Number of passed trials: 360
Number of failed trials: 0


### Copy failed trials to different folder for inspection

In [6]:

src_dir = os.path.join(output_exp_dir, "Task 2")        # choose Task 1 or Task 2 directory
failed_dest_dir = os.path.join(output_exp_dir, "failed")
os.makedirs(failed_dest_dir, exist_ok=True)

for num in range(df_failed_trials.shape[0]):
    audio_filename = df_failed_trials.iloc[num]['audio_filename'] + ".wav"
    plot_filename = df_failed_trials.iloc[num]['audio_filename'] + ".png"

    p_id = df_failed_trials.iloc[num]['participant_id']
    src_audio_filename = os.path.join(src_dir, f"participant_{p_id}" ,audio_filename)
    src_plot_filename =  os.path.join(src_dir, f"participant_{p_id}", plot_filename)
    
    if os.path.exists(src_audio_filename):
        print(f"File {src_audio_filename} exists. Copying to failed directory.")

        dest_audio_filename = os.path.join(failed_dest_dir, audio_filename)
        dest_plot_filename = os.path.join(failed_dest_dir, plot_filename)
        
        shutil.copy2(src_audio_filename, dest_audio_filename)
        shutil.copy2(src_plot_filename, dest_plot_filename)
    else:
        print(f"File {src_audio_filename} does not exist.")
    # print(src_audio_filename)

File output\mali-group1-mpi\Task 2\participant_2\participant_2__node_13__trial_27__trial_main_page.wav does not exist.
File output\mali-group1-mpi\Task 2\participant_2\participant_2__node_16__trial_25__trial_main_page.wav does not exist.
File output\mali-group1-mpi\Task 2\participant_2\participant_2__node_17__trial_26__trial_main_page.wav does not exist.
File output\mali-group1-mpi\Task 2\participant_2\participant_2__node_9__trial_29__trial_main_page.wav does not exist.
File output\mali-group1-mpi\Task 2\participant_2\participant_2__node_22__trial_36__trial_main_page.wav exists. Copying to failed directory.
File output\mali-group1-mpi\Task 2\participant_2\participant_2__node_26__trial_37__trial_main_page.wav exists. Copying to failed directory.
File output\mali-group1-mpi\Task 2\participant_4\participant_4__node_30__trial_78__trial_main_page.wav exists. Copying to failed directory.
File output\mali-group1-mpi\Task 2\participant_4\participant_4__node_31__trial_75__trial_main_page.wav ex

### Copy all plots to one place

In [5]:
import os
import shutil

# main_dir = r"output\oslo-pilot2"
main_dir = base_pkl_dir



work_sub_dir = "Task 2"
work_dir = os.path.join(main_dir, work_sub_dir)

dest_dir = os.path.join(main_dir, f"all_pngs_{work_sub_dir}") # r"all_pngs_Task2"
os.makedirs(dest_dir, exist_ok=True)

for root, dirs, files in os.walk(work_dir):
    for f in files:
        if f.lower().endswith(".png"):
            src = os.path.join(root, f)
            dst = os.path.join(dest_dir, f)

            # if duplicate filenames exist, rename automatically
            if os.path.exists(dst):
                base, ext = os.path.splitext(f)
                i = 1
                while os.path.exists(os.path.join(dest_dir, f"{base}_{i}{ext}")):
                    i += 1
                dst = os.path.join(dest_dir, f"{base}_{i}{ext}")

            shutil.copy2(src, dst)


### Copy selected flagged trials to different folder for inspection

In [None]:

src_dir = os.path.join(output_exp_dir, "Task 1")        # choose Task 1 or Task 2 directory
failed_dest_dir = os.path.join(output_exp_dir, "Rainer-flagged")
os.makedirs(failed_dest_dir, exist_ok=True)

selected_failed = [         # Task 1
    "participant_11__node_16__trial_156__trial_main_page",
    "participant_15__node_10__trial_239__trial_main_page",
    "participant_15__node_15__trial_237__trial_main_page",
    "participant_23__node_14__trial_362__trial_main_page",
    "participant_29__node_14__trial_487__trial_main_page",
    "participant_35__node_15__trial_614__trial_main_page",
]

# selected_failed = [     # Task 2
#     "participant_4__node_26__trial_17__trial_main_page",
# "participant_11__node_23__trial_164__trial_main_page",
# "participant_11__node_26__trial_162__trial_main_page",
# "participant_11__node_27__trial_161__trial_main_page",
# "participant_12__node_25__trial_181__trial_main_page",
# "participant_15__node_28__trial_246__trial_main_page",
# "participant_23__node_28__trial_369__trial_main_page",
# "participant_24__node_32__trial_392__trial_main_page",
# "participant_28__node_24__trial_476__trial_main_page",
# "participant_28__node_25__trial_477__trial_main_page",
# "participant_28__node_28__trial_479__trial_main_page",
# "participant_35__node_32__trial_624__trial_main_page",
# ]



for flagged_trial in selected_failed:
    audio_filename = flagged_trial + ".wav"
    plot_filename = flagged_trial + ".png"

    parts = flagged_trial.split("__")
    p_id = parts[0].split("_")[1]
    
    src_audio_filename = os.path.join(src_dir, f"participant_{p_id}" ,audio_filename)
    src_plot_filename =  os.path.join(src_dir, f"participant_{p_id}", plot_filename)
    
    if os.path.exists(src_audio_filename):
        print(f"File {src_audio_filename} exists. Copying to failed directory.")

        dest_audio_filename = os.path.join(failed_dest_dir, audio_filename)
        dest_plot_filename = os.path.join(failed_dest_dir, plot_filename)
        
        shutil.copy2(src_audio_filename, dest_audio_filename)
        shutil.copy2(src_plot_filename, dest_plot_filename)
    else:
        print(f"File {src_audio_filename} does not exist.")
    # print(src_audio_filename)

### Inspect/modify single trial

In [None]:
# import matplotlib.image as mpimg
# %matplotlib inline

# from post_repp_utils import (
#     # setup_participant_directories,
#     process_participant_audio_files,
#     run_repp_analysis_for_participant,
#     )

# output_exp_dir = r"output\marker max error 33\mali"     # contains Task 1 and Task 2 outputs

# row_idx = 4
# trial_id = df_failed_trials.iloc[row_idx]['trial_id']
# trial_maker_id = df_failed_trials.iloc[row_idx]['trial_maker_id']
# participant_id = df_failed_trials.iloc[row_idx]['participant_id']
# print(f"Trial id: {trial_id}")


# ##### Show failed trial plot
# failed_trial_plot_path = os.path.join(output_exp_dir, trial_maker_id, f"participant_{participant_id}", df_failed_trials.iloc[row_idx]['audio_filename'] + ".png")

# plt.clf()
# plt.figure(figsize=(15, 6))
# img = mpimg.imread(failed_trial_plot_path)
# imgplot = plt.imshow(img)
# plt.axis('off')
# plt.tight_layout()

# #####

# ### original data directory
# src_dir = r"D:\pyspace\Djembe\psynet\data_2025\Group-1\November-2025"
# src_exp_dir = r"mali-group1-final"
# base_dir = os.path.join(src_dir, src_exp_dir)

# TapTrialMusic_path = os.path.join(base_dir, "data", "TapTrialMusic.csv")
# TapTrialMusic_df = pd.read_csv(TapTrialMusic_path)
# ###

# output_audio_filename = df_failed_trials.iloc[row_idx]['audio_filename'] + ".wav"
# src_audio_filename = output_audio_filename.split("__", 1)[1]

# src_participant_dir = os.path.join(base_dir, "assets", trial_maker_id, "participants", f"participant_{participant_id}")
# output_participant_dir = os.path.join(output_exp_dir, trial_maker_id, f"participant_{participant_id}")


In [None]:
# audio_stim_pairs = process_participant_audio_files(
#     participant_id,
#     [src_audio_filename],        # list of audio filenames
#     src_participant_dir, 
#     output_participant_dir, 
#     TapTrialMusic_df,
#     overwrite=True  # Set to True to reprocess existing files
# )    

In [None]:
# from custom_config import sms_tapping     # see custom_config.py
# from repp.config import ConfigUpdater

# config_params= ConfigUpdater.create_config(
#     sms_tapping,    # see custom_config.py
#     {
#         'EXTRACT_THRESH': [0.12, 0.12],        # [0.225, 0.12]
#         'EXTRACT_COMPRESS_FACTOR': 1.1,
#         'EXTRACT_FIRST_WINDOW': [18, 18],           # [18, 18]
#         'EXTRACT_SECOND_WINDOW': [26, 120],         # [26, 120]
#         'MARKERS_MATCHING_WINDOW': 35.0,
#         'TAPPING_RANGE': [200, 400],            # [200, 400]
#         'MARKERS_MAX_ERROR': 33,               # 30 ms, 33 for future non20
#         ## TODO: add a parameter that extend the MARKER ERROR THRESHOLD to 20.

#     }
# )

# # Run REPP analysis for all recordings
# results = run_repp_analysis_for_participant(
#     audio_stim_pairs,
#     output_participant_dir,
#     config_params,
#     title_plot= None,
#     display_plots=False,
#     figsize=(14, 12)
#     )

### Read Questionaire Data from Response.csv

In [None]:
with open('question_mapping.json', 'r') as file:
    name_to_title = json.load(file)
name_to_title

In [None]:
import os
import json
import pandas as pd
from datetime import datetime

current_year = datetime.now().year

base_dir = r"D:\pyspace\Djembe\psynet\data_2025\Group-1\October-2025\mali-group1-oct16"
part_demographics_path = os.path.join(base_dir, "regular", "data", "Response.csv")

df = pd.read_csv(part_demographics_path)


valid_qs = [f"question{i}" for i in range(1, 23)]
records = {}

for _, row in df.iterrows():
    pid = row["participant_id"]
    ans = row["answer"]

    # Skip empty
    if pd.isna(ans):
        continue

    # Try to detect dictionary-like entries
    ans = ans.strip()

    # CASE 1: dictionary (your target)
    if ans.startswith("{") and ans.endswith("}"):
        try:
            data = json.loads(ans)
        except:
            continue
        
        # collect answers
        if pid not in records:
            records[pid] = {}

        for q in valid_qs:
            if q in data:
                records[pid][q] = data[q]

    # CASE 2: list (ignore)
    elif ans.startswith("[") and ans.endswith("]"):
        continue

    # CASE 3: anything else â†’ ignore
    else:
        continue

# Convert to DataFrame
demographics_df = pd.DataFrame.from_dict(records, orient="index").reset_index()
demographics_df = demographics_df.rename(columns={"index": "participant_id"})
demographics_df = demographics_df.rename(columns=name_to_title)

# Assuming column is now renamed to "Year of birth"
demographics_df["Age"] = current_year - demographics_df["Year of birth"].astype(int)

save_path = os.path.join(base_dir, "participant_demographics.csv")
demographics_df.to_csv(save_path, index=False)

demographics_df.head(10)

# Add a new column for each question with its human-readable title
# for q, title in name_to_title.items():
#     if q in demographics_df.columns:
#         demographics_df[title] = demographics_df[q]

### Per trial and participant csv read