## 1. Preprocess ##

In [3]:
import os
import sys
from pathlib import Path
import pandas as pd
import numpy as np
import math
# from tqdm.notebook import tqdm
import seaborn as sns
import matplotlib.pyplot as plt

In [11]:
# 获取项目根目录
project_root = Path(os.getcwd()).parent
sys.path.append(str(project_root))

# 导入预处理器
import importlib
import src.preprocess as preprocess
importlib.reload(preprocess)
from src.preprocess import Preprocessor_A, Preprocessor_B

# 初始化预处理器
preprocessor_a = Preprocessor_A()
preprocessor_b = Preprocessor_B()

# 导入配置数据
from config import body_length, features_range, canvas_settings

In [12]:
# Task1a, Task1b, Task3c preprocess
def preprocess_main_a(project_root, taskID, subIDs, features_range, canvas_settings, body_length, preprocessor):
    input_path = Path(project_root) / 'data' / 'raw' / taskID
    output_path = Path(project_root) / 'data' / 'processed' / taskID 
    os.makedirs(output_path, exist_ok=True)

    for subID in subIDs:
        if taskID == 'Task1a':
            feature_init = pd.DataFrame({
            'neck_length': [0.5], 'head_length': [0.5], 'leg_length': [0.5], 'tail_length': [0.5],
            'neck_angle': [0.5], 'head_angle': [0.5], 'leg_angle': [0.5], 'tail_angle': [0.5]
         })
        elif taskID == 'Task1b':
            stimulus_data = pd.read_csv(input_path / f'{taskID}_{subID}_sti.csv')
            feature_init = stimulus_data[stimulus_data['type'] == 2]
        else:
            feature_init = pd.read_csv(input_path / f'{taskID}_{subID}_sti.csv')
            
        mouse_trajactory = pd.read_csv(input_path / f'{taskID}_{subID}_mouse.csv')
        
        feature_trajactory = preprocessor.process(taskID, feature_init, mouse_trajactory, features_range, canvas_settings, body_length)
        feature_trajactory.to_csv(os.path.join(output_path, f'{taskID}_{subID}_feature.csv'), index=False)

# Task2, Task3a, Task3b preprocess
def preprocess_main_b(project_root, taskID, subIDs, preprocessor):
    input_path = Path(project_root) / 'data' / 'raw' / taskID
    output_path = Path(project_root) / 'data' / 'processed'
    os.makedirs(output_path, exist_ok=True)

    all_data = []
    for subID in subIDs:
        if taskID in ['Task2', 'Task3a']:
            stimulus_data = pd.read_csv(input_path / f'{taskID}_{subID}_sti.csv')
        elif taskID == 'Task3b':
            left_stimulus_data = pd.read_csv(input_path / f'{taskID}_{subID}_left.csv')
            right_stimulus_data = pd.read_csv(input_path / f'{taskID}_{subID}_right.csv')
            stimulus_data = pd.merge(left_stimulus_data, right_stimulus_data, on=['pairID'])

        behavior_data = pd.read_csv(input_path / f'{taskID}_{subID}_bhv.csv')

        combined_data = preprocessor.process(taskID, stimulus_data, behavior_data)
        combined_data.insert(0, 'iSub', subID)
        all_data.append(combined_data)

    processed_data = pd.concat(all_data, ignore_index=True)
    processed_data.to_csv(os.path.join(output_path, f'{taskID}_processed.csv'), index=False)

In [7]:
# Task1b, Task3c reconstruct
def preprocess_construct(project_root, taskID, subIDs):
    raw_path = Path(project_root) / 'data' / 'raw' / taskID
    processed_path = Path(project_root) / 'data' / 'processed' / taskID
    output_path = Path(project_root) / 'data' / 'processed'

    all_data = []
    for subID in subIDs:
        if taskID == 'Task1b':
            stimulus_data = pd.read_csv(raw_path / f'{taskID}_{subID}_sti.csv')
            stimulus_data = stimulus_data.drop(columns=['version', 'display_height', 'PairID'])
            stimulus_data['type'] = stimulus_data['type'].replace({1: 'target', 2: 'adjust_init'})

        elif taskID == 'Task3c':
            stimulus_data = pd.read_csv(raw_path / f'{taskID}_{subID}_sti.csv')
            stimulus_data.insert(0, 'type', 'adjust_init')

        feature_trajactory = pd.read_csv(processed_path / f'{taskID}_{subID}_feature.csv')
        adjust_after = feature_trajactory.groupby('iTrial').last().reset_index()

        new_rows = stimulus_data[stimulus_data['type'] == 'adjust_init'][['iTrial', 'body_ori']].copy()
        new_rows.insert(0, 'type', 'adjust_after')

        feature_columns = ['neck_length', 'head_length', 'leg_length', 'tail_length', 
                        'neck_angle', 'head_angle', 'leg_angle', 'tail_angle']
        new_rows = new_rows.merge(adjust_after[['iTrial'] + feature_columns], on='iTrial', how='left')

        combined_data = pd.concat([stimulus_data, new_rows], ignore_index=True)
        combined_data.insert(0, 'iSub', subID)
        all_data.append(combined_data)

    processed_data = pd.concat(all_data, ignore_index=True)
    processed_data.to_csv(os.path.join(output_path, f'{taskID}_processed.csv'), index=False)


In [13]:
subIDs = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25]
# preprocess_main_a(project_root, 'Task1a', subIDs, features_range, canvas_settings, body_length, preprocessor_a)
# preprocess_main_a(project_root, 'Task1b', subIDs, features_range, canvas_settings, body_length, preprocessor_a)
# preprocess_main_a(project_root, 'Task3c', subIDs, features_range, canvas_settings, body_length, preprocessor_a)

# preprocess_construct(project_root, 'Task1b', subIDs)
# preprocess_construct(project_root, 'Task3c', subIDs)

preprocess_main_b(project_root, 'Task2', subIDs, preprocessor_b)
# preprocess_main_b(project_root, 'Task3a', subIDs, preprocessor_b)

## 2. Perceptive Error Analysis

In [14]:
# 获取项目根目录
project_root = Path(os.getcwd()).parent
sys.path.append(str(project_root))

# 导入处理器
import importlib
import src.error_evaluation as error_evaluation
importlib.reload(error_evaluation)
from src.error_evaluation import Processor

# 初始化预处理器
processor = Processor()

In [None]:
processed_path = Path(project_root) / 'data' / 'processed'
processed_data = pd.read_csv(processed_path / f'Task1b_processed.csv')

error = processor.error_calculation(processed_data)
summary = processor.error_summary(error)

# 保存结果
result_path = Path(project_root) / 'results' / 'Raw'
os.makedirs(result_path, exist_ok=True)

# 使用示例：
processor.plot_error(error, "length")  # 绘制长度误差图
processor.plot_error(error, "angle")   # 绘制角度误差图
# 使用函数
processor.plot_error_by_feature(error)

## 3. Recording Analysis

In [7]:
# 获取项目根目录
project_root = Path(os.getcwd()).parent
sys.path.append(str(project_root))

# # 导入处理器
# import importlib
# import src.audio_coding as audio_coding
# importlib.reload(audio_coding)
# from src.audio_coding import Processor

# # 初始化预处理器
# processor = Processor()

In [13]:
import os
import pandas as pd
import re

# Define feature synonyms
feature_synonyms = {
    'neck': ['脖子'],
    'head': ['头'],
    'leg': ['腿'],
    'tail': ['尾巴']
}

# Define adjective categories
adjective_synonyms = {
    1: ['短'],
    3: ['长'],
    2: ['正常', '中等', '适中']
}

def interpret(description):
     # Handle NaN values
    if pd.isna(description):
        return {
            **{f'{feature}_value': 'NA' for feature in feature_synonyms.keys()},
            **{f'{feature}_direct': 'NA' for feature in feature_synonyms.keys()},
            **{f'{feature}_indirect': 'NA' for feature in feature_synonyms.keys()}
        }
    # Convert description to string if it isn't already
    description = str(description)

    # Rest of the function remains the same
    features = {f'{feature}_value': 'NA' for feature in feature_synonyms.keys()}
    direct = {f'{feature}_direct': 'NA' for feature in feature_synonyms.keys()}
    indirect = {f'{feature}_indirect': 'NA' for feature in feature_synonyms.keys()}

    # Interpret each feature based on its synonyms and adjective categories
    for feature, terms in feature_synonyms.items():
        for term in terms:
            for value, adjectives in adjective_synonyms.items():
                if any(re.search(f'{term}.{{0,2}}{adj}', description) for adj in adjectives):
                    features[f'{feature}_value'] = value
                    direct[f'{feature}_direct'] = 1
    
    # Apply the "比" logic if the description contains "比"
    if '比' in description:
        for feature, terms in feature_synonyms.items():
            for term in terms:
                if any(re.search(f'比{term}.{{0,2}}{adj}', description) for adj in adjective_synonyms[1]):
                    features[f'{feature}_value'] = 3
                    direct[f'{feature}_direct'] = 'NA'
                    indirect[f'{feature}_indirect'] = 1                       
                elif any(re.search(f'比{term}.{{0,2}}{adj}', description) for adj in adjective_synonyms[3]):
                    features[f'{feature}_value'] = 1
                    direct[f'{feature}_direct'] = 'NA'
                    indirect[f'{feature}_indirect'] = 1

    return {**features, **direct, **indirect}

In [16]:
import pandas as pd
import re

# 定义形容词到数值的映射
value_mapping = {
    '长': 3,
    '比较长': 3,
    '很长': 3,
    '短': 1,
    '比较短': 1,
    '很短': 1,
    '中等': 2,
    '适中': 2
}

# 定义身体部位列表
body_parts = ['头', '脖子', '腿', '尾巴']

# 编译正则表达式
# 匹配形容词
adjective_pattern = re.compile('|'.join(value_mapping.keys()))
# 匹配部位
body_part_pattern = re.compile('|'.join(body_parts))

def extract_values(text):
    result = {
        'invalid': 0,
        'noinfo': 0,
        'neck_value': None,
        'head_value': None,
        'leg_value': None,
        'tail_value': None
    }
    
    if pd.isnull(text) or str(text).strip() == '':
        result['invalid'] = 1
        return result
    
    # 分隔每一项信息，使用中文逗号
    items = re.split('[，,]', text)
    
    # 用于记录是否有有效信息
    has_info = False
    
    for item in items:
        item = item.strip()
        if not item:
            continue
        
        # 分隔多个部位，使用“和”或顿号“、”
        parts = re.split('[和、]', item)
        parts = [part.strip() for part in parts if part.strip()]
        
        # 提取形容词
        adjective_search = adjective_pattern.search(item)
        if adjective_search:
            adjective = adjective_search.group()
            value = value_mapping.get(adjective, None)
            if value:
                has_info = True
                for part in parts:
                    if part in body_parts:
                        key = f"{part[:-1]}_value" if part.endswith('子') else f"{part}_value"
                        result[key] = value
        else:
            # 没有找到形容词，跳过
            continue
    
    if not has_info:
        result['noinfo'] = 1
    else:
        result['noinfo'] = 0
    
    return result

In [17]:

input_dir = Path(project_root) / 'data' / 'raw' / 'Task2'
output_dir = Path(project_root) / 'data' / 'processed' / 'Task2' 
os.makedirs(output_dir, exist_ok=True)

for filename in os.listdir(input_dir):
    if filename.endswith('rec.csv'):
        input_path = os.path.join(input_dir, filename)
        output_path = os.path.join(output_dir, filename)

        try:
            df = pd.read_csv(input_path, encoding='utf-8')
        except UnicodeDecodeError:
            df = pd.read_csv(input_path, encoding='gbk')
        
        extracted_data = df['text'].apply(extract_values).apply(pd.Series)
        df = pd.concat([df, extracted_data], axis=1)

        df.to_csv(output_path, index=False, encoding='utf-8')

In [18]:

input_dir = Path(project_root) / 'data' / 'raw' / 'Task2'
output_dir = Path(project_root) / 'data' / 'processed' / 'Task2' 
os.makedirs(output_dir, exist_ok=True)

for filename in os.listdir(input_dir):
    if filename.endswith('rec.csv'):
        input_path = os.path.join(input_dir, filename)
        output_path = os.path.join(output_dir, filename)

        try:
            df = pd.read_csv(input_path, encoding='utf-8')
        except UnicodeDecodeError:
            df = pd.read_csv(input_path, encoding='gbk')

        df['invalid'] = 0

        for index, row in df.iterrows():
            text = row['text']
            if pd.isna(text):
                df.at[index, 'invalid'] = 1

        # Apply the interpret function to each row's 'text' column
        features_df = df['text'].apply(interpret).apply(pd.Series)

        # Concatenate the original dataframe with the new features dataframe
        result_df = pd.concat([df, features_df], axis=1)

        # Add the 'noinfo' column
        result_df['noinfo'] = 'NA'

        for index, row in result_df.iterrows():
            if row['invalid'] == 1:
                result_df.at[index, 'noinfo'] = 'NA'
            elif all(row[f'{feature}_value'] == 'NA' for feature in feature_synonyms.keys()):
                result_df.at[index, 'noinfo'] = 1
            else:
                result_df.at[index, 'noinfo'] = 0

        # Move the 'noinfo' column to be after 'invalid'
        cols = list(result_df.columns)
        cols.insert(cols.index('invalid') + 1, cols.pop(cols.index('noinfo')))
        result_df = result_df[cols]

        # Move the 'text' column to the end
        text_column = result_df.pop('text')
        result_df['text'] = text_column

        # Save the resulting dataframe to a new CSV file
        result_df.to_csv(output_path, index=False, encoding='utf-8')