In [1]:
# move to project root
%cd ..
%pwd

/Users/heste/workspace/soccernet/sn-script


'/Users/heste/workspace/soccernet/sn-script'

In [2]:
# 関連ファイル

# yolo の検出結果
src_csv_file = "database/demo/yolo_ball_tracking_results.csv"

# 1つの画像につき高々1つのボールを選択
single_csv_file = "database/demo/ball_tracking_single.csv"

# 補完
interpolated_csv_file = "database/demo/ball_tracking_interpolated.csv"

# ファイルパス
input_player_csv = "database/demo/players_in_frames_sn_gamestate.csv"
sample_metadata_file = "database/demo/sample_metadata.csv"

# 出力
sub_output_player_csv = "database/demo/players_in_frames_sn_gamestate_with_image_id.csv"
output_player_csv = "database/demo/players_in_frames_with_ball.csv"

## 可視化

In [3]:
import cv2
import pandas as pd
import os

# CSVファイルの読み込み
def visualize(csv_path):
    df = pd.read_csv(csv_path)

    # 動画ファイルが入っているディレクトリ（例：video_dir）
    video_dir = "/Users/heste/Downloads/video"

    # CSVに含まれる各動画ごとに処理
    video_ids = df["video_id"].unique()
    for video_id in sorted(video_ids):
        # video_idは "SNGS-0010" のような形式なので、番号部分を取り出して4桁のファイル名に変換
        video_number = video_id.split("-")[1]  # 例："0010"
        video_path = os.path.join(video_dir, f"{video_number}.mp4")
        if not os.path.exists(video_path):
            print(f"動画ファイルが見つかりません: {video_path}")
            continue

        cap = cv2.VideoCapture(video_path)
        frame_counter = 1  # CSVでの image_id は 1～750 などと対応していると仮定

        while True:
            ret, frame = cap.read()
            if not ret:
                break  # 動画の終了

            # CSVから該当フレームの検出結果を抽出
            frame_detections = df[(df["video_id"] == video_id) & (df["image_id"] == frame_counter)]
            for _, row in frame_detections.iterrows():
                # 検出結果が存在する場合のみ描画（NaNチェック）
                if pd.notnull(row["x1"]) and pd.notnull(row["y1"]) and pd.notnull(row["x2"]) and pd.notnull(row["y2"]):
                    x1 = int(row["x1"])
                    y1 = int(row["y1"])
                    x2 = int(row["x2"])
                    y2 = int(row["y2"])
                    # バウンディングボックスを描画
                    cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                    cv2.putText(frame, f"{x1},{y1},{x2},{y2}", (x1, max(y1 - 10, 0)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

            # フレーム上に動画IDとフレーム番号を表示（オプション）
            cv2.putText(frame, f"{video_id} - Frame: {frame_counter}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 0, 0), 2)

            # フレームの表示（FPS=25の場合、待機時間は約40ms）
            cv2.imshow(f"Ball Tracking: {video_id}", frame)
            key = cv2.waitKey(40)  # 30ms待機。 'q' キーで中断可能
            if key == ord('q'):
                break

            frame_counter += 1

        cap.release()
        cv2.destroyWindow(f"Ball Tracking: {video_id}")

    cv2.destroyAllWindows()

## 1つの画像につき高々1つのボールを選択

In [None]:
visualize(src_csv_file)

[K_mat さんの kaggle DFL 2nd place solution](https://www.kaggle.com/competitions/dfl-bundesliga-data-shootout/discussion/360097)を参考に、距離と確信度から適切ボールを選択します

In [3]:
#
import pandas as pd
import numpy as np
import os
from math import sqrt

# パラメータ設定
alpha = 100.0       # 距離コストの重み（調整が必要）
missing_penalty = 10.0  # 検出がない場合のペナルティコスト

# CSV読み込み（ball_tracking_results.csv）
df = pd.read_csv(src_csv_file)

# 動画ごとに処理（各動画内でフレーム番号は image_id としてあると仮定）
results_tracking = []

for video_id, group in df.groupby("video_id"):
    # 各フレーム番号（画像ID）を昇順に取得
    frames = sorted(group["image_id"].unique())

    # 各フレームごとに候補リストを作成
    # 候補は検出がある場合、辞書型 {x1, y1, x2, y2, center, conf} として保持
    # 検出がないフレームはダミー候補として None を入れる
    frame_candidates = {}
    for f in frames:
        candidates = []
        df_frame = group[group["image_id"] == f]
        for idx, row in df_frame.iterrows():
            if pd.notnull(row["x1"]):
                # バウンディングボックス中心を計算（ここでは単純平均）
                cx = (row["x1"] + row["x2"]) / 2
                cy = (row["y1"] + row["y2"]) / 2
                # confidence列が存在する前提
                conf_val = row["conf"]
                candidates.append({
                    "x1": row["x1"],
                    "y1": row["y1"],
                    "x2": row["x2"],
                    "y2": row["y2"],
                    "center": (cx, cy),
                    "conf": conf_val
                })
        if len(candidates) == 0:
            # 検出がなければダミー候補を追加
            candidates.append(None)
        frame_candidates[f] = candidates

    # 動的計画法による最適経路探索
    # dp[frame] には各候補に対する (累積コスト, 候補インデックス, 前フレームの候補インデックス) を保存
    dp = {}
    frames_sorted = frames
    # 初フレーム
    dp[frames_sorted[0]] = []
    for i, cand in enumerate(frame_candidates[frames_sorted[0]]):
        if cand is None:
            cost = missing_penalty
        else:
            cost = (1 - cand["conf"])
        dp[frames_sorted[0]].append((cost, i, None))

    # 2フレーム目以降
    for f_idx in range(1, len(frames_sorted)):
        f = frames_sorted[f_idx]
        dp[f] = []
        for j, cand in enumerate(frame_candidates[f]):
            if cand is None:
                cand_cost = missing_penalty
            else:
                cand_cost = (1 - cand["conf"])
            best_cost = float('inf')
            best_prev = None
            # 前フレームの各候補との遷移コストを評価
            for (prev_cost, prev_i, _) in dp[frames_sorted[f_idx-1]]:
                prev_cand = frame_candidates[frames_sorted[f_idx-1]][prev_i]
                # 両フレームとも検出候補があればユークリッド距離を計算、どちらかがNoneなら距離コストは0
                if prev_cand is None or cand is None:
                    trans_cost = 0
                else:
                    dx = cand["center"][0] - prev_cand["center"][0]
                    dy = cand["center"][1] - prev_cand["center"][1]
                    distance = sqrt(dx*dx + dy*dy)
                    trans_cost = alpha * distance
                total_cost = prev_cost + trans_cost + cand_cost
                if total_cost < best_cost:
                    best_cost = total_cost
                    best_prev = prev_i
            dp[f].append((best_cost, j, best_prev))

    # 最終フレームから最小コスト経路をバックトラックで復元
    best_path = {}
    last_frame = frames_sorted[-1]
    best_final = min(dp[last_frame], key=lambda x: x[0])
    best_index = best_final[1]
    best_path[last_frame] = best_index

    for f_idx in range(len(frames_sorted)-1, 0, -1):
        f = frames_sorted[f_idx]
        _, _, best_prev = dp[f][best_path[f]]
        best_path[frames_sorted[f_idx-1]] = best_prev

    # 最適な候補を各フレームに対して選択し、結果リストに追加
    for f in frames_sorted:
        chosen_candidate = frame_candidates[f][best_path[f]]
        if chosen_candidate is None:
            # ダミーの場合は、後ほど線形補完などを実施可能（ここではNaNで出力）
            results_tracking.append({
                "video_id": video_id,
                "image_id": f,
                "x1": np.nan,
                "y1": np.nan,
                "x2": np.nan,
                "y2": np.nan,
                "conf": np.nan
            })
        else:
            results_tracking.append({
                "video_id": video_id,
                "image_id": f,
                "x1": chosen_candidate["x1"],
                "y1": chosen_candidate["y1"],
                "x2": chosen_candidate["x2"],
                "y2": chosen_candidate["y2"],
                "conf": chosen_candidate["conf"]
            })

# 最終結果をCSVに保存（各画像につき1個のボール座標）
df_track = pd.DataFrame(results_tracking)
df_track.sort_values(["video_id", "image_id"], inplace=True)
df_track.to_csv(single_csv_file, index=False)
print(f"最適化されたボール追跡結果を {single_csv_file} に出力しました")


最適化されたボール追跡結果を database/demo/ball_tracking_single.csv に出力しました


## 線形補完

In [None]:
visualize(single_csv_file)

In [14]:
import pandas as pd
import numpy as np

max_gap_length = 25  # 補完する最大フレーム数

# CSV読み込み
df = pd.read_csv(single_csv_file)

# video_id と image_id でソート（image_idはフレーム番号として扱う）
df.sort_values(["video_id", "image_id"], inplace=True)
df.reset_index(drop=True, inplace=True)

# 各video_id毎に処理するためのリスト
df_list = []

# 各動画ごとにグループ分け
for _, group_loop_var in df.groupby("video_id"):
    group = group_loop_var.sort_values("image_id").reset_index(drop=True)
    # 補完対象は、ボール検出が有効な行（x1がNaNでない）とする
    valid_idx = group.index[group["x1"].notna()].tolist()

    # 連続する有効検出の間の欠損フレームについて補完処理
    for i in range(len(valid_idx) - 1):
        start_idx = valid_idx[i]
        end_idx   = valid_idx[i+1]
        # gapは両端を除いた欠損フレーム数
        gap = end_idx - start_idx - 1

        # もし欠損があり、かつその長さが25フレーム以内なら線形補完を実施
        if gap > 0 and gap <= max_gap_length:
            start_row = group.loc[start_idx]
            end_row   = group.loc[end_idx]
            frame_start = start_row["image_id"]
            frame_end   = end_row["image_id"]
            total_gap = frame_end - frame_start  # 実際のフレーム間隔（通常はend_idx-start_idx）

            # 各欠損フレームに対して補完
            for j in range(1, gap + 1):
                # 補完比率
                ratio = j / total_gap
                # 補完結果（各座標を線形補完）
                group.loc[start_idx + j, "x1"] = start_row["x1"] + ratio * (end_row["x1"] - start_row["x1"])
                group.loc[start_idx + j, "y1"] = start_row["y1"] + ratio * (end_row["y1"] - start_row["y1"])
                group.loc[start_idx + j, "x2"] = start_row["x2"] + ratio * (end_row["x2"] - start_row["x2"])
                group.loc[start_idx + j, "y2"] = start_row["y2"] + ratio * (end_row["y2"] - start_row["y2"])
                # 信頼度がある場合も線形補完
                group.loc[start_idx + j, "conf"] = start_row["conf"] + ratio * (end_row["conf"] - start_row["conf"])
    df_list.append(group)

# 各グループを結合して補完済みのデータフレームを生成
df_interp = pd.concat(df_list, ignore_index=True)

# 補完結果のCSVとして保存
df_interp.to_csv(interpolated_csv_file, index=False)
print(f"補完結果を {interpolated_csv_file} に保存しました")

補完結果を database/demo/ball_tracking_interpolated.csv に保存しました


In [15]:
visualize(interpolated_csv_file)

KeyboardInterrupt: 

## gsrの出力csvと紐つける

In [5]:
import pandas as pd
import numpy as np
# CSVファイルの読み込み
df_players = pd.read_csv(input_player_csv)
df_sample = pd.read_csv(sample_metadata_file)
df_ball = pd.read_csv(interpolated_csv_file)

# --- 型変換 ---
# df_players の time は object になっているため、数値に変換（数値文字列である前提）
if not os.path.exists(sub_output_player_csv):
    df_players['time'] = pd.to_numeric(df_players['time'], errors='coerce')
    df_players.dropna(subset=['time'], inplace=True)

# --- Step 1: players_in_frames_sn_gamestate に sample_metadata の id を sample_id として付与 ---
def find_closest_sample(row, df_sample):
    """
    同じ game, half で、row['time'] が [start, end] 内にあるか、
    あるいは start/end との差が最も小さいサンプルの id を返す。
    """
    # game, half が一致する候補を抽出
    candidates = df_sample[(df_sample['game'] == row['game']) & (df_sample['half'] == row['half'])]
    if candidates.empty:
        return np.nan
    if len(candidates) == 1:
        return candidates['id'].values[0]
    # プレイヤーの time とサンプルの [start, end] の関係で距離を計算
    def distance(sample):
        t = row['time']
        start = sample['start']
        end = sample['end']
        # time が [start, end] 内なら距離0、そうでなければ start/end との差の最小値
        if start <= t <= end:
            return 0
        else:
            return min(abs(t - start), abs(t - end))
    candidates = candidates.copy()
    candidates['dist'] = candidates.apply(distance, axis=1)
    best = candidates.loc[candidates['dist'].idxmin()]
    return best['id']

# 各プレイヤー行に対して sample_id を付与
if not os.path.exists(sub_output_player_csv):
    df_players['sample_id'] = df_players.apply(lambda row: find_closest_sample(row, df_sample), axis=1)

# sample_id のstart,endをdf_playersに追加
if not os.path.exists(sub_output_player_csv):
    df_players = pd.merge(df_players, df_sample[['id', 'start', 'end']], left_on='sample_id', right_on='id', how='left')
    df_players.drop(columns='id', inplace=True)
    df_players.rename(columns={'start': 'sample_start', 'end': 'sample_end'}, inplace=True)


# sample_id 毎にグループ化して image_id を付与し、グループごとに結合
if not os.path.exists(sub_output_player_csv):
    df_players.to_csv(sub_output_player_csv, index=False)
    print(f"{sub_output_player_csv} に sample_id と image_id を付与して保存しました")


# --- Step 2: players_in_frames と ball_tracking_interpolated.csv を結合 ---
def find_ball_detection(row, df_ball, tol=2):
    """
    プレイヤー側の行（すでに sample_id と image_id が付与されている）に対し、
    同じ video_id (＝ "SNGS-XXXX", XXXXは sample_id を4桁にしたもの) の候補から、
    row['image_id'] と ball_tracking の image_id の差が最も小さい行を探し、
    ボール座標 (x1, y1, x2, y2) および conf を返す。
    tol: 許容するフレーム差の閾値（ここでは例として2フレーム以内）
    """
    # sample_id を整数に変換し、4桁の文字列形式にして video_id を生成
    try:
        sample_id_int = int(row['sample_id'])
    except:
        return pd.Series([np.nan, np.nan, np.nan, np.nan, np.nan])
    video_id = f"SNGS-{sample_id_int:04d}"

    # video_id が一致する候補を抽出
    candidates = df_ball[df_ball['video_id'] == video_id]
    if candidates.empty:
        return pd.Series([np.nan, np.nan, np.nan, np.nan, np.nan])

    candidates = candidates.copy()
    # players側の image_id と ball_tracking の image_id の差を計算（絶対値）
    candidates['time_diff'] = (candidates['image_id'] - row['image_id']).abs()
    best = candidates.loc[candidates['time_diff'].idxmin()]
    # 許容差を超える場合は欠損とする
    if best['time_diff'] > tol:
        return pd.Series([np.nan, np.nan, np.nan, np.nan, np.nan])
    return pd.Series([best['x1'], best['y1'], best['x2'], best['y2'], best['conf']])

if os.path.exists(sub_output_player_csv):
    df_players = pd.read_csv(sub_output_player_csv)

# 各プレイヤー行に対して、対応するボール検出情報を付与
df_players[['ball_x1', 'ball_y1', 'ball_x2', 'ball_y2', 'ball_conf']] = \
    df_players.apply(lambda row: find_ball_detection(row, df_ball), axis=1)


# --- 結果を保存 ---
df_players.to_csv(output_player_csv, index=False)
print(f"結合結果を '{output_player_csv}' として保存しました")

database/demo/players_in_frames_sn_gamestate_with_image_id.csv に sample_id と image_id を付与して保存しました
結合結果を 'database/demo/players_in_frames_with_ball.csv' として保存しました


## 後処理

In [18]:
df_players = pd.read_csv(input_player_csv)

group = df_players.groupby(['game', 'half'])
for k,g in group:
    print(g["time"].unique().size)

562
395
642
676
553
347
704
546
516
659
749
425
394
673
624
670
470


In [6]:
df_players = pd.read_csv(output_player_csv)
# _720p　というsuffixを消す
cols = df_players.columns
df_players.columns = [col.replace("_720p", "") for col in cols]

# sample_id and image_id　でソート
df_players = df_players.sort_values(["sample_id", "image_id"])

# 座標をintにする
df_players['ball_x1'] = pd.to_numeric(df_players['ball_x1'], errors='coerce').round().astype("Int64")
df_players['ball_y1'] = pd.to_numeric(df_players['ball_y1'], errors='coerce').round().astype("Int64")
df_players['ball_x2'] = pd.to_numeric(df_players['ball_x2'], errors='coerce').round().astype("Int64")
df_players['ball_y2'] = pd.to_numeric(df_players['ball_y2'], errors='coerce').round().astype("Int64")
# .02f
df_players['ball_conf'] = df_players['ball_conf'].apply(lambda x: f"{x:.2f}")

df_players.drop(columns=['sample_start', 'sample_end'], inplace=True)
df_players.to_csv(output_player_csv, index=False)


In [4]:
import cv2
import pandas as pd
import numpy as np
import os

def visualize_gsr_ball(csv_path):
    # CSVファイルの読み込み
    df = pd.read_csv(csv_path)

    # sample_id (数値) を "SNGS-XXXX" の形式に変換して video_id 列として追加
    df["video_id"] = df["sample_id"].apply(lambda x: f"SNGS-{int(x):04d}" if pd.notnull(x) else np.nan)

    player_flag = True
    ball_flag = all(col in df.columns for col in ["ball_x1", "ball_y1", "ball_x2", "ball_y2", "ball_conf"])

    # 動画ファイルが入っているディレクトリ（例：video_dir）
    video_dir = "/Users/heste/Downloads/video"

    # CSVに含まれる各動画ごとに処理
    video_ids = df["video_id"].dropna().unique()
    for video_id in sorted(video_ids):
        # video_idは "SNGS-0010" のような形式なので、番号部分を取り出して4桁のファイル名に変換
        video_number = video_id.split("-")[1]
        video_path = os.path.join(video_dir, f"{video_number}.mp4")
        if not os.path.exists(video_path):
            print(f"動画ファイルが見つかりません: {video_path}")
            continue

        cap = cv2.VideoCapture(video_path)
        frame_counter = 1  # CSVでの image_id と対応（1～750など）

        while True:
            ret, frame = cap.read()
            if not ret:
                break  # 動画終了

            # CSVから該当フレームの情報を抽出（video_id と image_id でフィルタ）
            frame_detections = df[(df["video_id"] == video_id) & (df["image_id"] == frame_counter)]
            for _, row in frame_detections.iterrows():
                # --- 選手の情報を描画 ---
                # 選手のバウンディングボックス (x1,y1)-(x2,y2) が存在する場合のみ描画
                if player_flag:
                    px1 = int(row["x1"])
                    py1 = int(row["y1"])
                    px2 = int(row["x2"])
                    py2 = int(row["y2"])
                    # 青色で選手のバウンディングボックス描画
                    cv2.rectangle(frame, (px1, py1), (px2, py2), (255, 0, 0), 2)
                    # 選手名を表示（青文字）
                    player_name = str(row["name"])
                    cv2.putText(frame, player_name, (px1, max(py1 - 10, 0)),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 0), 2)

                # --- ボールの検出結果を描画 ---
                if ball_flag and pd.notnull(row["ball_x1"]):
                    bx1 = int(row["ball_x1"])
                    by1 = int(row["ball_y1"])
                    bx2 = int(row["ball_x2"])
                    by2 = int(row["ball_y2"])
                    # 緑色でボールのバウンディングボックス描画
                    cv2.rectangle(frame, (bx1, by1), (bx2, by2), (0, 255, 0), 2)
                    # ボールの信頼度を表示
                    conf_text = f"{row['ball_conf']:.2f}" if pd.notnull(row["ball_conf"]) else ""
                    cv2.putText(frame, conf_text, (bx1, max(by1 - 10, 0)),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

            # フレーム上に動画IDとフレーム番号を表示（オプション）
            cv2.putText(frame, f"{video_id} - Frame: {frame_counter}", (10, 30),
                        cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 0, 0), 2)

            cv2.imshow(f"Ball & Player Tracking: {video_id}", frame)
            key = cv2.waitKey(40)  # 約40ms待機（25fpsの場合）
            if key == ord('q'):
                break

            frame_counter += 1

        cap.release()
        cv2.destroyWindow(f"Ball & Player Tracking: {video_id}")

    cv2.destroyAllWindows()



In [5]:
# 使用例
visualize_gsr_ball(input_player_csv)