In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
from detecta import detect_peaks, detect_onset
from glob import glob
from tqdm import tqdm
import scipy.signal as signal
import warnings
warnings.filterwarnings('ignore')

def lowpass_filter(data, sr, cut_off, order):
    nyq = 0.5 * sr
    b, a = signal.butter(order, cut_off/nyq, btype = 'low')
    lp_df = signal.filtfilt(b, a, data,padlen=0)
    return lp_df

def ang_vel_made(data, sr):
    x = np.diff(data) / (1 / sr)
    x = pd.Series(x, index = np.arange(1,len(data)))
    return x

# CHS
def line_vel_made(data, sr):
    diffs = np.diff(data, axis=0)
    ti = 1 / sr  # time_interval
    vel = diffs / ti  # velocities
    lv = pd.Series(vel, index=np.arange(1, len(data)))
    return lv

# 다운샘플링 함수
def downsample_data(data, original_freq, target_freq):
    factor = original_freq // target_freq
    return data[::factor]

In [2]:
day = '20240307'

In [3]:
file_list = [i.replace('\\','/') for i in sorted(glob(f"txt file/{day}/*txt"))]
file_name = []
for i in file_list:
    file, _, _ = os.path.basename(i).split('.')
    file_name.append(file)
    
file_name

['PYS_002_R_75_180_OH_116_S', 'SBB_003_R_65_167_OH_117_S']

In [4]:
pd.read_excel(f'c3d file/{day}/player_information.xlsx',index_col = 'trial')

Unnamed: 0_level_0,ball_speed,strike,weight,height,type,Unnamed: 6,cop
trial,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
PYS_001,117,S,75,180,OH,,17
PYS_002,118,S,75,180,OH,,21
PYS_003,119,S,75,180,OH,,20
PYS_004,120,S,75,180,OH,,15
PYS_005,121,S,75,180,OH,,26
PYS_006,122,S,75,180,OH,,25
PYS_007,123,S,75,180,OH,,17
PYS_008,124,S,75,180,OH,,20
PYS_009,125,S,75,180,OH,,35
PYS_010,126,S,75,180,OH,,15


In [17]:
dataset = {
    'force' : {},
    'kine'  : {}
    }

wrong = {     # 시점 컨택 안된 데이터 모으기
    'force' : {},
    'kine'  : {}
    } 
non_force = {}

for file, name in zip(file_list, file_name):
    df = pd.read_csv(file, sep='\t', encoding='cp949', header=[1,2,3,4])
    df.drop('Unnamed: 0_level_0', axis=1, inplace=True)
    df.drop('LAR_ROTMAT', axis=1, inplace=True)
    kinematic_len = len(df['FRAMES'].dropna())
    kinetic_len = kinematic_len * 6

    cols = []
    for col in df.columns:
        if col[0] not in ['FP1','FP2']:
            cols.append(col)
            
    kine = df[cols].iloc[:kinematic_len, :]
    force = df[['FP1','FP2']].iloc[:kinetic_len,:]
    
    weight = float(name.split('_')[3])
    height = float(name.split('_')[4])
    
    for_cols = []
    for col in force.columns:
        side, data, marker, axis = col
        if side == 'FP1':    # 뒷발이 FP1
            side = 'REAR'
        elif side == 'FP2':  # 앞발이 FP2
            side = 'LEAD'
            
        for_cols.append(f"{side}_{data}_{axis}")
    
    force.columns = for_cols
    
    # non_force[name] = force
    for col in for_cols:
        if ('FORCE' in col) | ('FREEMOMENT' in col):
            force[col] = lowpass_filter(force[col], 1000, 40, 4)  # 1000Hz, cutoff 40
            force[col] = 100 * (force[col] / 9.81) / weight
            
    kine_cols = []
    for col in kine.columns:
        data, _, ro, axis = col
        if data in ['FRAMES','TIME']:
            kine_cols.append(data)
        elif data in ['DistEndPos','ProxEndPos']:
            kine_cols.append(f"{data}_{ro}_{axis}")
        else:
            kine_cols.append(f"{data}_{axis}")
        
    for idx ,col in enumerate(kine_cols):
        if 'R' in file: # 오른손 골퍼
            pit_type = 'R'
            if ('ANKLE' in col) | ('HIP' in col) | ('KNEE' in col) | ('HEEL' in col):
                split_col = col.split('_')
                
                if 'R' in split_col[0]: # 오른손 골퍼의 오른쪽 다리 => REAR
                    split_col[0] = 'REAR'
                    split_col = '_'.join(split_col)
                    kine_cols[idx] = split_col
                elif 'L' in split_col[0]: # 오른손 골퍼의 왼쪽 다리 => LEAD
                    split_col[0] = 'LEAD'
                    split_col = '_'.join(split_col)
                    kine_cols[idx] = split_col

            elif ('ELBOW' in col) | ('SHOULDER' in col) | ('WRIST' in col):
                split_col = col.split('_')
                
                if 'R' in col.split('_')[0]: # 오른손 골퍼의 오른쪽 팔 => REAR
                    split_col[0] = 'REAR'
                    split_col = '_'.join(split_col)
                    kine_cols[idx] = split_col
                elif 'L' in col.split('_')[0]: # 오른손 골퍼의 왼쪽 팔 => LEAD
                    split_col[0] = 'LEAD'
                    split_col = '_'.join(split_col)
                    kine_cols[idx] = split_col
                    
        elif 'L' in file: # 왼손 골퍼
            pit_type = 'L'
            if ('ANKLE' in col) | ('HIP' in col) | ('KNEE' in col) | ('HEEL' in col):
                split_col = col.split('_')
                
                if 'R' in split_col[0]: # 왼쪽 골퍼의 오른쪽 다리 => LEAD
                    split_col[0] = 'LEAD'
                    split_col = '_'.join(split_col)
                    kine_cols[idx] = split_col
                elif 'L' in split_col[0]: # 왼쪽 골퍼의 왼쪽 다리 => REAR
                    split_col[0] = 'REAR'
                    split_col = '_'.join(split_col)
                    kine_cols[idx] = split_col
                    
            elif ('ELBOW' in col) | ('SHOULDER' in col) | ('WRIST' in col):
                split_col = col.split('_')
                
                if 'R' in col.split('_')[0]: # 왼쪽 골퍼의 오른쪽 팔 => LEAD
                    split_col[0] = 'LEAD'
                    split_col = '_'.join(split_col)
                    kine_cols[idx] = split_col
                elif 'L' in col.split('_')[0]: # 왼쪽 골퍼의 왼쪽 팔 => REAR
                    split_col[0] = 'REAR'
                    split_col = '_'.join(split_col)
                    kine_cols[idx] = split_col
                    
    for idx, col in enumerate(kine_cols):
        if ('LTH' in col) | ('RTH' in col) | ('LAR' in col) | ('RAR' in col):
            split_col = col.split('_')
            if split_col[2] in ['LTH','RTH']:
                split_col[2] = 'TH'
                
            elif split_col[2] in ['LAR','RAR']:
                split_col[2] = 'AR'
            
            split_col = '_'.join(split_col)
            
            kine_cols[idx] = split_col
            
    newcols = []
    for col in kine_cols:
        if pit_type == 'R':
            if col in ['RFA_X', 'RFA_Y', 'RFA_Z', 'LFA_X', 'LFA_Y', 'LFA_Z', 'RHA_X', 'RHA_Y', 'RHA_Z', 'LHA_X', 'LHA_Y', 'LHA_Z']:
                if 'R' in col:
                    col = 'LEAD_' + col[1:]
                elif 'L' in col:
                    col = 'REAR_' + col[1:]

                newcols.append(col)    
                
            else:
                newcols.append(col)
        elif pit_type == 'L':
            if col in ['RFA_X', 'RFA_Y', 'RFA_Z', 'LFA_X', 'LFA_Y', 'LFA_Z', 'RHA_X', 'RHA_Y', 'RHA_Z', 'LHA_X', 'LHA_Y', 'LHA_Z']:
                if 'R' in col:
                    col = 'REAR_' + col[1:]
                elif 'L' in col:
                    col = 'LEAD_' + col[1:]

                newcols.append(col)        
            else:
                newcols.append(col)

    kine.columns = newcols
    
    ang_cols = []
    vel_cols = []
    for col in kine_cols:
        if 'ANGLE' in col:
            ang_cols.append(col)
        if 'ANGULAR' in col:
            vel_cols.append(col)
            
    for col in ang_cols:
        up = np.where((np.diff(kine[col]) > 300))[0].tolist()
        down = np.where((np.diff(kine[col]) < -300))[0].tolist()
        all = sorted([*up, *down])
        all_len = len(all)
        
        if (len(up) == 0) & (len(down) == 0):
            pass
        
        elif (len(up) == 1) & (len(down) == 0):
            kine[col].iloc[all[0]+1:] = kine[col].iloc[all[0]+1:] - 360

        elif (len(up) == 0) & (len(down) == 1):
            kine[col].iloc[all[0]+1:] = kine[col].iloc[all[0]+1:] + 360

        elif (len(up) >= 1) & (len(down) >= 1):
            if up[0] < down[0]:
                if all_len % 2 == 0:
                    for i in range(0,all_len,2):
                        kine[col].iloc[all[i]+1:all[i+1]+1] = kine[col].iloc[all[i]+1:all[i+1]+1] - 360
                elif all_len % 2 == 1:
                    for i in range(0,all_len - 1, 2):
                        kine[col].iloc[all[i]+1:all[i+1]+1] = kine[col].iloc[all[i]+1:all[i+1]+1] - 360
                    kine[col].iloc[all[-1]+1:] = kine[col].iloc[all[-1]+1:] - 360                                               
            elif up[0] > down[0]:
                if all_len % 2 == 0:
                    for i in range(0,all_len,2):
                        kine[col].iloc[all[i]+1:all[i+1]+1] = kine[col].iloc[all[i]+1:all[i+1]+1] + 360
                elif all_len % 2 == 1:
                    for i in range(0,all_len - 1, 2):
                        kine[col].iloc[all[i]+1:all[i+1]+1] = kine[col].iloc[all[i]+1:all[i+1]+1] + 360
                    kine[col].iloc[all[-1]+1:] = kine[col].iloc[all[-1]+1:] + 360
                    
    kine['PELVIS_ANGLUAR_VELOCITY_X'] = ang_vel_made(kine['PELVIS_ANGLE_X'], 200)
    kine['PELVIS_ANGLUAR_VELOCITY_Y'] = ang_vel_made(kine['PELVIS_ANGLE_Y'], 200)
    kine['PELVIS_ANGLUAR_VELOCITY_Z'] = ang_vel_made(kine['PELVIS_ANGLE_Z'], 200)
    kine['TORSO_ANGLUAR_VELOCITY_X']  = ang_vel_made(kine['TORSO_ANGLE_X'], 200)
    kine['TORSO_ANGLUAR_VELOCITY_Y']  = ang_vel_made(kine['TORSO_ANGLE_Y'], 200)
    kine['TORSO_ANGLUAR_VELOCITY_Z']  = ang_vel_made(kine['TORSO_ANGLE_Z'], 200)
    
    # culb head speed
    kine['club_X'] = line_vel_made(kine['END_X'], 200)
    kine['club_Y'] = line_vel_made(kine['END_Y'], 200)
    kine['club_Z'] = line_vel_made(kine['END_Z'], 200)
    
    kine = kine.fillna(0)
    force = force.fillna(0)
    
    for col in kine.columns:
        if col not in ['FRAMES','TIME']:
            kine[col] = lowpass_filter(kine[col],200,13.4,4)  # 200Hz, cutoff 13.4

    for col in ang_cols:    
        if pit_type == 'L':
            split_col = col.split('_')
            if split_col[-1] in ['Y', 'Z']:
                kine[col] = - kine[col]
    
    for col in vel_cols:    
        if pit_type == 'L':
            split_col = col.split('_')
            if split_col[-1] in ['Y', 'Z']:
                kine[col] = - kine[col]
                
    stide_length = round(100 * (kine['LEAD_HEEL_Y'] - kine['REAR_HEEL_Y']).max() * 100 / height,2)
    
    session = f"{name.split('_')[0]}_{name.split('_')[1]}"
    
    # CHS 합 속도
    CHS = np.sqrt(kine['club_X']**2 + kine['club_Y']**2 + kine['club_Z']**2)
    kine['CHS'] = CHS
    
    sr = 200
    plot = False

    try:
        dataset['force'][name] = force
        dataset['kine'][name] = kine
        
        address = detect_onset(CHS, threshold=0.5, n_above=sr*0.5, show=plot)[0][0]
        top = [i for i in detect_peaks(CHS, mph=3, mpd=sr*0.1, valley=True, show=plot) if address < i][0]
        
        if name == 'SBB_003_R_65_167_OH_117_S':
            impact = 464
            kine['impact'] = 464
        if name == 'PYS_002_R_75_180_OH_116_S':
            impact = 728
            kine['impact'] = 728
        
        # impact = [i for i in detect_peaks(CHS, mph=3, mpd=sr*0.1, show=plot) if top < i][0]
        
        finish = impact + 40
        
        if address < top < impact < finish:
            kine['address'] = address
            kine['top']     = top
            kine['impact']  = impact
            kine['finish']  = finish
            kine['height']  = height
            kine['weight']  = weight
            kine['stride_length'] = stide_length
            
            force['address'] = address * 6
            force['top']     = top * 6
            force['impact']  = impact * 6
            force['finish']  = finish * 6
            print(address, top, impact, finish)
            
        lead_peak = detect_peaks(force['LEAD_FORCE_Z'].iloc[force['address'][0]:force['finish'][0]], mph=100, mpd=50, show=plot) + force['address'][0]
        force['lead_peak_z'] = lead_peak[0]

        lead_valley = detect_peaks(force['LEAD_FORCE_Y'].iloc[force['address'][0]:force['finish'][0]], mph=0, mpd=10000000, valley=True, show=plot) + force['address'][0]
        force['lead_valley_y'] = lead_valley[0]
        
        dataset['force'][name] = force
        dataset['kine'][name] = kine

    except Exception:
        print(f"{name} 에서 시점 찾기 오류 발생")
        wrong['kine'][name] = kine
        wrong['force'][name] = force
    # print(name)

424 673 728 768
250 406 464 504


In [10]:
for name in dataset['kine']:
    kine = dataset['kine'][name]
    force = dataset['force'][name]
    kine.to_csv(f"processed file/{day}/kine/{name}.csv", index=False)
    force.to_csv(f"processed file/{day}/force/{name}.csv", index=False)

In [7]:
''' 
부호 바꾸는 코드
    change_minus_plus = ['LEAD_SHOULDER_ANGLE_Z','LEAD_KNEE_ANGLE_X',
                         'TORSO_ANGLE_X','TORSO_ANGLE_Y','LEAD_ELBOW_ANGULAR_VELOCITY_X', 'LEAD_SHOULDER_ANGLE_Y']
    if pit_type == 'L':
        change_minus_plus.extend(['PELVIS_ANGLUAR_VELOCITY_Z','TORSO_ANGLUAR_VELOCITY_Z','LEAD_SHOULDER_ANGLE_X'])
        
    kine[change_minus_plus] = - kine[change_minus_plus]
    
힘값 다운샘플링
    force = downsample_data(force, 1000, 200)
    force.reset_index(drop=True, inplace=True)
'''

" \n부호 바꾸는 코드\n    change_minus_plus = ['LEAD_SHOULDER_ANGLE_Z','LEAD_KNEE_ANGLE_X',\n                         'TORSO_ANGLE_X','TORSO_ANGLE_Y','LEAD_ELBOW_ANGULAR_VELOCITY_X', 'LEAD_SHOULDER_ANGLE_Y']\n    if pit_type == 'L':\n        change_minus_plus.extend(['PELVIS_ANGLUAR_VELOCITY_Z','TORSO_ANGLUAR_VELOCITY_Z','LEAD_SHOULDER_ANGLE_X'])\n        \n    kine[change_minus_plus] = - kine[change_minus_plus]\n    \n힘값 다운샘플링\n    force = downsample_data(force, 1000, 200)\n    force.reset_index(drop=True, inplace=True)\n    \n    "