In [1]:
#我已經有一個函數extract_baseball_trajectory_from_video將影片pitch_0001.mp4丟進去
#trajectory = extract_baseball_trajectory_from_video(video_path='/content/pitch_0001.mp4', model=model, conf_threshold=0.5, baseball_class_name="baseball")
#他會輸出trajectory再丟到extract_longest_valid_segment(trajectory)會回傳longest_segment(代表真正的投球軌跡)
#我想做的事情是我有一個dir裡面放很多的mp4影片從pitch_0001.mp4到pitch_xxxx.mp4都有
#請你自動幫我把每一筆影片的longest_segment都抽出來當作這筆樣本的X
#再根據影片名例如pitch_0001.mp4 去我指定的csv檔案裏面
#找到Filename等於影片名的那一行對應的description當作y
#最後製作成batch_X(list of 每一個影片的投球軌跡),batch_y(list of 每一個影片的description)


In [2]:
#!pip install baseballcv ultralytics

In [3]:

import cv2
import matplotlib.pyplot as plt
import numpy as np

def extract_baseball_trajectory_from_video(video_path, model, conf_threshold=0.5, baseball_class_name="baseball"):
    """
    從影片逐幀偵測棒球位置，回傳軌跡座標列表，並畫出軌跡圖。

    參數：
        video_path: str，影片檔案路徑
        model: 已載入的 YOLOv8 模型物件
        conf_threshold: float，信心度門檻，低於此忽略
        baseball_class_name: str，棒球類別名稱（依你模型的類別而定）

    回傳：
        trajectory: list of (frame_index, x_center, y_center)
    """
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise RuntimeError(f"無法開啟影片檔：{video_path}")

    trajectory = []
    frame_idx = 0

    while True:
        ret, frame = cap.read()
        if not ret:
            break  # 讀完了

        # YOLOv8 預測，model 物件直接丟 np.ndarray 也行
        results = model(frame,verbose=False)  # 取得 list[Results]

        # 因為只處理一張圖，所以取 results[0]
        res = results[0]

        # 取得類別名稱與boxes/conf
        names = res.names
        boxes = res.boxes.xyxy.cpu().numpy()
        scores = res.boxes.conf.cpu().numpy()
        class_ids = res.boxes.cls.cpu().numpy().astype(int)

        # 找出棒球位置（第一個符合信心度且是棒球的框）
        ball_found = False
        for box, score, cls_id in zip(boxes, scores, class_ids):
            if score < conf_threshold:
                continue
            label = names.get(cls_id, str(cls_id))
            if label == baseball_class_name:
                x1, y1, x2, y2 = box.astype(int)  # <-- 修正這行
                x_center = (x1 + x2) / 2
                y_center = (y1 + y2) / 2
                trajectory.append((frame_idx, x_center, y_center))
                ball_found = True
                # 畫框與標籤
                text = f"{label} {score:.2f}"
                cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                cv2.putText(frame, text, (x1, y1 - 6), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 1)
                break

        # 如果該幀沒找到棒球，也可以記錄 None 或忽略
        if not ball_found:
            trajectory.append((frame_idx, None, None))

        # 準備進下一個迴圈
        frame_idx += 1

    # 釋放
    cap.release()
    return trajectory

In [5]:
def extract_longest_valid_segment(trajectory):

    max_len = 0
    current = []
    best = []

    for item in trajectory:
        frame_idx, x, y = item
        if x is not None and y is not None:
            current.append(item)
        else:
            if len(current) > max_len:
                max_len = len(current)
                best = current
            current = []
    # 最後一段也可能是最長的
    if len(current) > max_len:
        best = current

    if best:
        start_idx = best[0][0]
        end_idx = best[-1][0]
        print(f"選擇的段落：從 frame {start_idx} 到 frame {end_idx}，共 {len(best)} 幀")
    else:
        print("找不到有效段落")
    return best

In [None]:
import cv2
from ultralytics import YOLO
from baseballcv.functions import LoadTools
from tqdm import tqdm
import cv2

# 載入模型
load_tools = LoadTools()
model_path = load_tools.load_model("ball_tracking")
model = YOLO(model_path)

In [7]:
import os
import pandas as pd
import pickle
from tqdm import tqdm

def load_data_from_videos_and_csv(video_dir, csv_path, output_dir, model, conf_threshold=0.5, baseball_class_name="baseball"):
    # 讀 CSV
    df = pd.read_csv(csv_path)

    batch_X = []
    batch_y = []

    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    video_files = [f for f in os.listdir(video_dir) if f.lower().endswith('.mp4')]
    video_files.sort()  # 如果要排序

    for vf in tqdm(video_files):
        video_path = os.path.join(video_dir, vf)
        print(f"處理影片: {vf}")

        # 抽取軌跡
        trajectory = extract_baseball_trajectory_from_video(video_path, model, conf_threshold, baseball_class_name)

        # 取得最長有效段
        #longest_segment = extract_longest_valid_segment(trajectory)

        # 暫時先直接用全部軌跡 後續再處理有效段的問題 因為還不知道怎麼處理有效段
        longest_segment = trajectory

        # 若找不到有效段，跳過或記空
        if not longest_segment:
            print(f"警告: {vf} 找不到有效投球軌跡段，跳過")
            continue

        # 尋找對應label
        row = df[df['Filename'] == vf]
        if row.empty:
            print(f"警告: {vf} 在 CSV 中找不到對應標籤，跳過")
            continue
        description = row.iloc[0]['description']

        # 將軌跡段和label加入 batch
        batch_X.append(longest_segment)  # 每個元素是list of (frame_idx,x,y)
        batch_y.append(description)

        # 存檔 pickle 格式：影片名_baseball_trajectory_with_description.pkl
        save_path = os.path.join(output_dir, f"{os.path.splitext(vf)[0]}_baseball_trajectory_with_description.pkl")
        with open(save_path, 'wb') as f:
            pickle.dump({
                'trajectory': longest_segment,
                'description': description
            }, f)
        print(f"已存檔: {save_path}")

    return batch_X, batch_y


In [None]:
# 範例用法
player_name_list = [
    'Shohei_Ohtani_SL','Shohei_Ohtani_FS','Shohei_Ohtani_FF',
    'Gerrit_Cole_CH','Gerrit_Cole_FF','Gerrit_Cole_SL'
]
for player_name in tqdm(player_name_list):
  video_dir = f'/content/drive/MyDrive/Baseball Movies/{player_name}_videos_4S'
  csv_path = f'/content/drive/MyDrive/Baseball Movies/data_csv/{player_name}.csv'
  output_dir = f'/content/drive/MyDrive/Baseball Movies/{player_name}_videos_4S/baseball_trajectory'
  batch_X, batch_y = load_data_from_videos_and_csv(video_dir, csv_path, output_dir, model)
  print(f"{player_name}:總共處理影片數: {len(batch_X)}")