### 강의에서 소개된 파이썬 주요 기능
- ffmpeg: https://anaconda.org/conda-forge/ffmpeg
- scipy.signal.savgol_filter: https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.savgol_filter.html
- matplotlib.pyplot.quiver: https://matplotlib.org/stable/api/_as_gen/matplotlib.pyplot.quiver.html
- matplotlib.animation.FFMpegWriter: https://matplotlib.org/stable/api/_as_gen/matplotlib.animation.FFMpegWriter.html
- matplotlib.pyplot.clf: https://matplotlib.org/stable/api/_as_gen/matplotlib.pyplot.clf.html
- matplotlib.pyplot.close: https://matplotlib.org/stable/api/_as_gen/matplotlib.pyplot.close.html

### 선수별 속도 벡터 및 속력 산출

In [None]:
import os
import warnings
import numpy as np
import pandas as pd
import scipy.signal as signal
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from src.plot_utils import draw_pitch

warnings.simplefilter(action="ignore", category=pd.errors.PerformanceWarning)

##### (1) 결합 데이터 불러오기

In [None]:
match_id = 1
file = f'data_metrica/Sample_Game_{match_id}/Sample_Game_{match_id}_IntegratedData.csv'
traces = pd.read_csv(file, header=0, index_col=0)
traces

##### (2) 특정 선수 속도/속력 산출

In [None]:
period = 1
player = 'H11'
player_cols = ['period', 'time', f'{player}_x', f'{player}_y']
player_traces = traces.loc[traces['period'] == period, player_cols]
player_traces

In [None]:
dt = player_traces['time'].diff()
vx = player_traces[f'{player}_x'].diff() / dt
vy = player_traces[f'{player}_y'].diff() / dt

raw_speeds = np.sqrt(vx ** 2 + vy ** 2)
lim_speed = 12
vx[raw_speeds > lim_speed] = np.nan
vy[raw_speeds > lim_speed] = np.nan

plt.figure(figsize=(15, 5))
plt.rcParams.update({'font.size': 15})
plt.plot(player_traces['time'][10000:12000], raw_speeds[10000:12000])
plt.xlabel('Time [s]')
plt.ylabel('Speed [m/s]')
plt.show()

##### (3) 사비츠키-골레이 필터(Savitzky-Golay filter)를 활용한 속도/속력 스무딩(smoothing)

In [None]:
vx = signal.savgol_filter(vx, window_length=13, polyorder=1)
vy = signal.savgol_filter(vy, window_length=13, polyorder=1)
speeds = np.sqrt(vx ** 2 + vy ** 2)

plt.figure(figsize=(15, 5))
plt.rcParams.update({'font.size': 15})
plt.plot(player_traces['time'][10000:12000], speeds[10000:12000])
plt.xlabel('Time [s]')
plt.ylabel('Speed [m/s]')
plt.show()

##### (4) 선수별 속도/속력 산출 함수 구현

In [None]:
def calc_running_features(traces, lim_speed=12, smoothing=True, window_length=13, polyorder=1):
    players = [c[:-2] for c in traces.columns if c[0] in ['H', 'A'] and c.endswith('_x')]

    for period in traces['period'].unique():
        period_traces = traces[traces['period'] == period]

        idx = period_traces.index
        dt = period_traces['time'].diff()

        for player in players:
            vx = period_traces[f'{player}_x'].diff() / dt
            vy = period_traces[f'{player}_y'].diff() / dt
            
            raw_speeds = np.sqrt(vx ** 2 + vy ** 2)
            vx[raw_speeds > lim_speed] = np.nan
            vy[raw_speeds > lim_speed] = np.nan
            vx = vx.interpolate()
            vy = vy.interpolate()

            if smoothing:
                vx = signal.savgol_filter(vx, window_length=13, polyorder=1)
                vy = signal.savgol_filter(vy, window_length=13, polyorder=1)
            
            traces.loc[idx, f'{player}_vx'] = vx
            traces.loc[idx, f'{player}_vy'] = vy
            traces.loc[idx, f'{player}_speed'] = np.sqrt(vx ** 2 + vy ** 2)

    return traces

In [None]:
traces = calc_running_features(traces)
traces

In [None]:
traces.to_csv('data_metrica/Sample_Game_1/Sample_Game_1_IntegratedData.csv')

##### (5) 여러 경기 위치 추적 데이터 가공 및 저장

In [None]:
matches = [d for d in os.listdir('data_metrica') if not d.startswith('.')]
matches.sort()
matches

In [None]:
for match in matches:
    home_file = f'data_metrica/{match}/{match}_RawTrackingData_Home_Team.csv'
    away_file = f'data_metrica/{match}/{match}_RawTrackingData_Away_Team.csv'
    event_file = f'data_metrica/{match}/{match}_RawEventsData.csv'
    try:
        home_traces = pd.read_csv(home_file, header=[0, 1, 2])
        away_traces = pd.read_csv(away_file, header=[0, 1, 2])
        events = pd.read_csv(event_file, header=0)
    except FileNotFoundError:
        continue

    home_players = [c[2] for c in home_traces.columns[3:-2:2]]
    home_trace_cols = [[f'H{int(p[6:]):02d}_x', f'H{int(p[6:]):02d}_y'] for p in home_players]
    home_trace_cols = np.array(home_trace_cols).flatten().tolist()
    home_traces.columns = ['period', 'frame', 'time'] + home_trace_cols + ['ball_x', 'ball_y']
    home_traces = home_traces.set_index('frame').astype(float)
    home_traces['period'] = home_traces['period'].astype(int)

    away_players = [c[2] for c in away_traces.columns[3:-2:2]]
    away_trace_cols = [[f'A{int(p[6:]):02d}_x', f'A{int(p[6:]):02d}_y'] for p in away_players]
    away_trace_cols = np.array(away_trace_cols).flatten().tolist()
    away_traces.columns = ['period', 'frame', 'time'] + away_trace_cols + ['ball_x', 'ball_y']
    away_traces = away_traces.set_index('frame').astype(float)
    away_traces['period'] = away_traces['period'].astype(int)
    
    cols = home_traces.columns[:-2].tolist() + away_traces.columns[2:].tolist()
    traces = pd.merge(home_traces, away_traces)[cols]
    traces.index = home_traces.index.astype(int)

    x_cols = [c for c in traces.columns if c.endswith('_x')]
    y_cols = [c for c in traces.columns if c.endswith('_y')]
    traces.loc[traces['period'] == 2, x_cols + y_cols] = 1 - traces.loc[traces['period'] == 2, x_cols + y_cols]
    traces[x_cols] *= 104
    traces[y_cols] *= 68

    events.loc[events['Subtype'].isna(), 'Subtype'] = events.loc[events['Subtype'].isna(), 'Type']
    for i, event in events.iterrows():
        start_frame = event['Start Frame']
        end_frame = event['End Frame']
        traces.loc[start_frame:end_frame-1, 'event_player'] = event['From']
        traces.loc[start_frame:end_frame-1, 'event_type'] = event['Type']
        traces.loc[start_frame:end_frame-1, 'event_subtype'] = event['Subtype']
    
    traces = calc_running_features(traces)
    traces.to_csv(f'data_metrica/{match}/{match}_IntegratedData.csv')
    print(f'Integrated data saved for {match}.')

### 경기 장면 애니메이션 시각화

##### (1) 속도 벡터 포함 특정 시점 이미지 시각화

In [None]:
frame = 1000

file = f'data_metrica/Sample_Game_{match_id}/Sample_Game_{match_id}_IntegratedData.csv'
traces = pd.read_csv(file, header=0, index_col=0)
data = traces.loc[frame]

fig, ax = draw_pitch(pitch='white', line='black')

for team, color in zip(['H', 'A'], ['r', 'b']):
    x_cols = [c for c in data.keys() if c.startswith(team) and c.endswith('_x')]
    y_cols = [c for c in data.keys() if c.startswith(team) and c.endswith('_y')]
    ax.scatter(data[x_cols], data[y_cols], s=100, c=color, alpha=0.7)
    
    for x, y in zip(x_cols, y_cols):
        if not (np.isnan(data[x]) or np.isnan(data[y])):
            ax.text(data[x] + 0.5, data[y] + 0.5, int(x[1:3]), fontsize=13, color=color)
    
    vx_cols = [c for c in data.keys() if c.startswith(team) and c.endswith('_vx')]
    vy_cols = [c for c in data.keys() if c.startswith(team) and c.endswith('_vy')]
    ax.quiver(
        data[x_cols].astype(float), data[y_cols].astype(float),
        data[vx_cols].astype(float), data[vy_cols].astype(float),
        color=color, scale=8, scale_units='inches', width=0.002, alpha=0.7
    )

ax.scatter(data['ball_x'], data['ball_y'], s=80, color='w', edgecolors='k')

time_text = f"{int(data['time'] // 60):02d}:{data['time'] % 60:05.2f}"
if not pd.isnull(data['event_subtype']):
    event_text = f"{data['event_subtype']} by {data['event_player']}" 
else:
    event_text = ''
ax.text(51, 67, time_text, fontsize=15, ha='right', va='top')
ax.text(53, 67, event_text, fontsize=15, ha='left', va='top')
    
plt.show()

##### (2) 시점별 이미지 시각화 함수 구현

In [None]:
def plot_snapshot(
    data, figax=None, team_colors=('r', 'b'),
    annotate_players=True, annotate_events=True, show_velocities=True
):
    if figax is None:
        fig, ax = draw_pitch(pitch='white', line='black')
    else:
        fig, ax = figax

    figobjs = []
    for team, color in zip(['H', 'A'], team_colors):
        x_cols = [c for c in data.keys() if c.startswith(team) and c.endswith('_x')]
        y_cols = [c for c in data.keys() if c.startswith(team) and c.endswith('_y')]
        obj = ax.scatter(data[x_cols], data[y_cols], s=100, c=color, alpha=0.7)
        figobjs.append(obj)

        if show_velocities:
            vx_cols = [c for c in data.keys() if c.startswith(team) and c.endswith('_vx')]
            vy_cols = [c for c in data.keys() if c.startswith(team) and c.endswith('_vy')]
            obj = ax.quiver(
                data[x_cols].astype(float), data[y_cols].astype(float),
                data[vx_cols].astype(float), data[vy_cols].astype(float),
                color=color, scale=8, scale_units='inches', width=0.002, alpha=0.7
            )
            figobjs.append(obj)
        
        if annotate_players:
            for x, y in zip(x_cols, y_cols):
                if not (np.isnan(data[x]) or np.isnan(data[y])):
                    obj = ax.text(data[x] + 0.5, data[y] + 0.5, int(x[1:3]), fontsize=13, color=color)
                    figobjs.append(obj)

    time_text = f"{int(data['time'] // 60):02d}:{data['time'] % 60:05.2f}"
    if annotate_events:
        if not pd.isnull(data['event_subtype']):
            event_text = f"{data['event_subtype']} by {data['event_player']}" 
        else:
            event_text = ''
        figobjs.append(ax.text(51, 67, time_text, fontsize=15, ha='right', va='top'))
        figobjs.append(ax.text(53, 67, event_text, fontsize=15, ha='left', va='top'))
    else:
        figobjs.append(ax.text(52, 67, time_text, fontsize=15, ha='center', va='top'))
        
    obj = ax.scatter(data['ball_x'], data['ball_y'], s=80, color='w', edgecolors='k')
    figobjs.append(obj)

    ax.set_xlim(-10, 114)
    ax.set_ylim(-7, 75)
    
    return fig, ax, figobjs

In [None]:
frame = 1000
fig, ax, figobjs = plot_snapshot(traces.loc[frame])
plt.show()

figobjs

##### (3) 경기 장면 애니메이션 시각화 함수 구현

In [None]:
def save_clip(
    clip_traces, fname='test', fps=25, figax=None, team_colors=('r', 'b'),
    annotate_players=True, annotate_events=True, show_velocities=True
):
    metadata = dict(title='Tracking Data', artist='Matplotlib', comment='Metrica tracking data clip')
    writer = animation.FFMpegWriter(fps=fps, metadata=metadata)

    if not os.path.exists('match_clips'):
        os.makedirs('match_clips')
    file = f'match_clips/{fname}.mp4'

    if figax is None:
        fig, ax = draw_pitch(pitch='white', line='black')
    else:
        fig, ax = figax
    fig.set_tight_layout(True)

    with writer.saving(fig, file, dpi=100):
        for i in clip_traces.index:
            frame_data = clip_traces.loc[i]
            fig, ax, figobjs = plot_snapshot(
                frame_data, (fig, ax), team_colors,
                annotate_players, annotate_events, show_velocities
            )
            writer.grab_frame()

            for obj in figobjs:
                obj.remove()

    plt.clf()
    plt.close(fig)

In [None]:
clip_traces = traces[:1000]
save_clip(clip_traces)